[GitHub] [flink] wuchong commented on issue #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
wuchong commented on issue #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState URL: https://github.com/apache/flink/pull/8909#issuecomment-506625995 Thanks @Myasuka . LGTM. +1 to merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298473800 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An implementation of {@link BoundedData} that writes directly into a File Channel. + * The readers are simple file channel readers using a simple dedicated buffer pool. + */ +final class FileChannelBoundedData implements BoundedData { + + private final Path filePath; + + private final FileChannel fileChannel; + + private final ByteBuffer[] headerAndBufferArray; + + private long size; + + private final int memorySegmentSize; + + FileChannelBoundedData( + Path filePath, + FileChannel fileChannel, + int memorySegmentSize) { + + this.filePath = checkNotNull(filePath); + this.fileChannel = checkNotNull(fileChannel); + this.memorySegmentSize = memorySegmentSize; + this.headerAndBufferArray = BufferReaderWriterUtil.allocatedWriteBufferArray(); + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + size += BufferReaderWriterUtil.writeToByteChannel(fileChannel, buffer, headerAndBufferArray); + } + + @Override + public void finishWrite() throws IOException { + fileChannel.close(); + } + + @Override + public Reader createReader() throws IOException { + checkState(!fileChannel.isOpen()); + + final FileChannel fc = FileChannel.open(filePath, StandardOpenOption.READ); + return new FileBufferReader(fc, memorySegmentSize); + } + + @Override + public long getSize() { + return size; + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(fileChannel); + Files.delete(filePath); + } + + // + + public static FileChannelBoundedData create(Path filePath, int memorySegmentSize) throws IOException { + final FileChannel fileChannel = FileChannel.open( + filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + + return new FileChannelBoundedData( + filePath, + fileChannel, + memorySegmentSize); + } + + // + + static final class FileBufferReader implements BoundedData.Reader, BufferRecycler { + + private static final int NUM_BUFFERS = 2; + + private final FileChannel fileChannel; + + private final ByteBuffer headerBuffer; + + private final ArrayDeque buffers; + + FileBufferReader(FileChannel fileChannel, int bufferSize) { + this.fileChannel = checkNotNull(fileChannel); + this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); + th
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298473554 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Utilities for temporal table join + */ +object TemporalJoinUtil { + + // + // Temporal TableFunction Join Utilities + // + + /** +* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly defines +* references to rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute. +* The condition is used to mark this is a temporal tablefunction join. +* Later rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute will be +* extracted from the condition. +*/ + val TEMPORAL_JOIN_CONDITION = new SqlFunction( +"__TEMPORAL_JOIN_CONDITION", +SqlKind.OTHER_FUNCTION, +ReturnTypes.BOOLEAN_NOT_NULL, +null, +OperandTypes.or( + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.DATETIME, +OperandTypes.ANY), + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.ANY)), +SqlFunctionCategory.SYSTEM) + + + def isRowtimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 3 + } + + def isProctimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 2 + } + + def makeRowTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightTimeAttribute, + rightPrimaryKeyExpression) + } + + def makeProcTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightPrimaryKeyExpression) + } + + + def containsTemporalJoinCondition(condition: RexNode): Boolean = { Review comment: Sorry, I misunderstood it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298472910 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Utilities for temporal table join + */ +object TemporalJoinUtil { + + // + // Temporal TableFunction Join Utilities + // + + /** +* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly defines +* references to rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute. +* The condition is used to mark this is a temporal tablefunction join. +* Later rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute will be +* extracted from the condition. +*/ + val TEMPORAL_JOIN_CONDITION = new SqlFunction( +"__TEMPORAL_JOIN_CONDITION", +SqlKind.OTHER_FUNCTION, +ReturnTypes.BOOLEAN_NOT_NULL, +null, +OperandTypes.or( + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.DATETIME, +OperandTypes.ANY), + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.ANY)), +SqlFunctionCategory.SYSTEM) + + + def isRowtimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 3 + } + + def isProctimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 2 + } + + def makeRowTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightTimeAttribute, + rightPrimaryKeyExpression) + } + + def makeProcTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightPrimaryKeyExpression) + } + + + def containsTemporalJoinCondition(condition: RexNode): Boolean = { Review comment: separate method? I'm just suggesting that you get rid of this class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298472375 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.physical.stream + +import org.apache.flink.table.plan.`trait`.FlinkRelDistribution +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical._ +import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin +import org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition +import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.JoinRelType + +import java.util + +class StreamExecTemporalUdtfJoinRule + extends RelOptRule( +operand( + classOf[FlinkLogicalJoin], Review comment: `LogicalCorrelateToLookupJoinRule` and `LogicalCorrelateToTemporalJoinRule`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506621740 Currently, Flink doesn't' notify task about the expired checkpoint now, [FLINK-8871](https://issues.apache.org/jira/browse/FLINK-8871) will fix it. After FLINK-8871 has been fixed, we can do better here. Thanks for your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#issuecomment-506621197 > Can you describe the rule process of LookupJoin and TemporalTableJoin separately? > And I strongly suggest you change class name of them, because them really make me confused... FOR SYSTEM_TIME AS OF: 1. Logical: `LogicalCorrelate` with `LogicalSnapshot` 2. `LogicalCorrelateToJoinFromTemporalTableRule` 3. FlinkLogical: `LogicalJoin` with `LogicalSnapshot` 4. `StreamExecLookupJoinRule` 5. Physical: `StreamExecLookupJoin` LATERAL udtf(left.proctime): 1. Logical: `LogicalCorrelate` with a `TemporalTableFunction` 2. `LogicalCorrelateToJoinFromTemporalTableFunctionRule` 3. FlinkLogical: `LogicalJoin` with a special join condition `TEMPORAL_JOIN_CONDITION` 4. `StreamExecTemporalJoinRule` 5. Physical: `StreamExecTemporalJoin` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506618006 The onCompletionPromis does complete with an exception, if the pendingCheckpoint has been abort, but the job **will not** exit (why job will exit because of checkpoint timeout) and its tasks will not resume to execute because you **don't** notify the syncLatch. For second part, I don't want the tasks to exit. I just want them to unblock and resume to execute when the sync checkpoint/savepoint is timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506618006 The onCompletionPromis does complete with an exception, if the pendingCheckpoint has been abort, but the job **will not** exit (why job will exit because of checkpoint timeout) and its tasks will not resume to execute because you **don't** notify the syncLatch. For second part, I don't want the tasks to exit. I just want them to unblock and resume to execute when the sync checkpoint/savepoint is timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12921) Flink Job Scheduling
[ https://issues.apache.org/jira/browse/FLINK-12921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874714#comment-16874714 ] vim commented on FLINK-12921: - [~knaufk] Thank you for your reply. > Flink Job Scheduling > > > Key: FLINK-12921 > URL: https://issues.apache.org/jira/browse/FLINK-12921 > Project: Flink > Issue Type: Improvement > Components: API / DataSet, API / DataStream >Affects Versions: 1.8.0 > Environment: flink 1.8.0 > jdk8 >Reporter: vim >Priority: Major > > This is an example from flink-docs: > Consider a program with a data source, a _MapFunction_, and a > _ReduceFunction_. The source and MapFunction are executed with a parallelism > of 4, while the ReduceFunction is executed with a parallelism of 3. A > pipeline consists of the sequence Source - Map - Reduce. On a cluster with 2 > TaskManagers with 3 slots each, the program will be executed as described > below. > !https://ci.apache.org/projects/flink/flink-docs-release-1.8/fig/slots.svg! > But after I tried, I found that it was not like this. My result is > TaskManager 1 used 3 slot, but TaskManager 2 just used 1 slot. Who else has > tested this example? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298468317 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/TemporalUdtfJoinTest.scala ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.stream.sql.join + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase} +import org.hamcrest.Matchers.containsString +import org.junit.Test + +class TemporalUdtfJoinTest extends TableTestBase { Review comment: Rename to `TemporalJoinTest` to align with `TemporalJoinITCase`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506618006 The onCompletionPromis does complete with an exception, if the pendingCheckpoint has been abort, but the job will not exit and its tasks will not resume to execute because you **don't** notify the syncLatch. For second part, I don't want the tasks to exit. I just want them to unblock and resume to execute when the sync checkpoint/savepoint is timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298467837 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.physical.stream + +import org.apache.flink.table.plan.`trait`.FlinkRelDistribution +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical._ +import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin +import org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition +import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.JoinRelType + +import java.util + +class StreamExecTemporalUdtfJoinRule + extends RelOptRule( +operand( + classOf[FlinkLogicalJoin], Review comment: Both of lookup join and temporal tablefunction join are correlates before optimization. We need to convert them to joins in the first step. I renamed them to `LogicalCorrelateToJoinFromTemporalTableRule` and `LogicalCorrelateToJoinFromTemporalTableFunctionRule`, what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#issuecomment-506617887 @azagrebin @zentol I submitted another hotfix commit for removing `IOManager#isProperlyShutDown`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506618006 The onCompletionPromis does complete with an exception, if the pendingCheckpoint has been abort, but the job will not exit and its tasks will not resume to execute because you **don't** notify the syncLatch. For second part, I don't want the tasks to exit. I just want them to unblock and resume to execute. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8913: [FLINK-12968][table-common] Preparation for more casting utilities
asfgit closed pull request #8913: [FLINK-12968][table-common] Preparation for more casting utilities URL: https://github.com/apache/flink/pull/8913 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
zentol commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#discussion_r298457662 ## File path: tools/travis/stage.sh ## @@ -212,6 +219,9 @@ function get_test_modules_for_stage() { (${STAGE_TESTS}) echo "-pl $modules_tests" ;; +(${STAGE_CONNECTOR_HIVE_1}) +echo "-pl $MODULES_CONNECTOR_HIVE -Phive-1.2.1" Review comment: on a side-note, if you would include clean here you'd only recompile the hive module, but it's quite a hack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506606238 To integrate it into the hadoop profile, modify the profile of these [builds](https://github.com/apache/flink/blob/master/.travis.yml#L119) to include the hive version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10701) Move modern kafka connector module into connector profile
[ https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874685#comment-16874685 ] Chesnay Schepler commented on FLINK-10701: -- the long-term goal is still to have kafka contained to a single profile, which still isn't the case. > Move modern kafka connector module into connector profile > -- > > Key: FLINK-10701 > URL: https://issues.apache.org/jira/browse/FLINK-10701 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka, Tests >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The modern connector is run in the {{misc}} profile since it wasn't properly > added to the {{connector profile in stage.sh click > [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to > see more details.}} > *This issue is blocked by FLINK-10603.* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13011) Release the PyFlink into PyPI
[ https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874684#comment-16874684 ] Dian Fu commented on FLINK-13011: - [~sunjincheng121] [~Zentol] I have contacted the owner of [https://pypi.org/project/pyflink/] and have got the ownership of this project. > Release the PyFlink into PyPI > - > > Key: FLINK-13011 > URL: https://issues.apache.org/jira/browse/FLINK-13011 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Build System >Affects Versions: 1.9.0 >Reporter: sunjincheng >Priority: Major > > FLINK-12962 adds the ability to build a PyFlink distribution package, but we > have not yet released PyFlink to PyPI. The goal of JIRA is to publish the > PyFlinjk distribution package built by FLINK-12962 to PyPI. > https://pypi.org/ > https://packaging.python.org/tutorials/packaging-projects/ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298456137 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java ## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.temporal; + +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataformat.util.BaseRowUtil; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; + +/** + * This operator works by keeping on the state collection of probe and build records to process + * on next watermark. The idea is that between watermarks we are collecting those elements + * and once we are sure that there will be no updates we emit the correct result and clean up the + * state. + * + * Cleaning up the state drops all of the "old" values from the probe side, where "old" is defined + * as older then the current watermark. Build side is also cleaned up in the similar fashion, + * however we always keep at least one record - the latest one - even if it's past the last + * watermark. + * + * One more trick is how the emitting results and cleaning up is triggered. It is achieved + * by registering timers for the keys. We could register a timer for every probe and build + * side element's event time (when watermark exceeds this timer, that's when we are emitting and/or + * cleaning up the state). However this would cause huge number of registered timers. For example + * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if we + * had received Watermark(10), it would trigger 5 separate timers for the same key. To avoid that + * we always keep only one single registered timer for any given key, registered for the minimal + * value. Upon triggering it, we process all records with event times older then or equal to + * currentWatermark. + */ +public class TemporalRowTimeJoinOperator Review comment: I will support state ttl in `TemporalRowTimeJoinOperator` too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298454608 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java ## @@ -117,7 +117,9 @@ public void open() throws Exception { @Override public void close() throws Exception { super.close(); - joinCondition.backingJoinCondition.close(); + if (joinCondition != null) { Review comment: I think we should protect it in each close() method. The compiled instance may not be instantiated when close() is invoked (an exception happens before instantiation). This change is not related to this pull request. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298454129 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Utilities for temporal table join + */ +object TemporalJoinUtil { + + // + // Temporal TableFunction Join Utilities + // + + /** +* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly defines +* references to rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute. +* The condition is used to mark this is a temporal tablefunction join. +* Later rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute will be +* extracted from the condition. +*/ + val TEMPORAL_JOIN_CONDITION = new SqlFunction( +"__TEMPORAL_JOIN_CONDITION", +SqlKind.OTHER_FUNCTION, +ReturnTypes.BOOLEAN_NOT_NULL, +null, +OperandTypes.or( + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.DATETIME, +OperandTypes.ANY), + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.ANY)), +SqlFunctionCategory.SYSTEM) + + + def isRowtimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 3 + } + + def isProctimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 2 + } + + def makeRowTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightTimeAttribute, + rightPrimaryKeyExpression) + } + + def makeProcTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightPrimaryKeyExpression) + } + + + def containsTemporalJoinCondition(condition: RexNode): Boolean = { Review comment: Emmm. I think a separate method to check condition is more clean here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298453939 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator +import org.apache.flink.streaming.api.transformations.{StreamTransformation, TwoInputTransformation} +import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, TableException, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, isRowtimeIndicatorType} +import org.apache.flink.table.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator} +import org.apache.flink.table.dataformat.BaseRow +import org.apache.flink.table.generated.GeneratedJoinCondition +import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin +import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode} +import org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION +import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, RelExplainUtil, TemporalJoinUtil} +import org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, TemporalRowTimeJoinOperator} +import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector +import org.apache.flink.table.types.logical.RowType +import org.apache.flink.table.typeutils.BaseRowTypeInfo +import org.apache.flink.util.Preconditions.checkState +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType} +import org.apache.calcite.rex._ + +import java.util + +import scala.collection.JavaConversions._ + +class StreamExecTemporalTableJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftRel: RelNode, +rightRel: RelNode, +condition: RexNode, +joinType: JoinRelType) + extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, joinType) + with StreamPhysicalRel + with StreamExecNode[BaseRow] { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = { +val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder) + +var rowtimeJoin: Boolean = false +val visitor = new RexVisitorImpl[Unit](true) { + override def visitCall(call: RexCall): Unit = { +if (call.getOperator == TEMPORAL_JOIN_CONDITION) { + rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call) +} else { + call.getOperands.foreach(node => node.accept(this)) +} + } +} +nonEquiJoinRex.accept(visitor) +rowtimeJoin + } + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { +new StreamExecTemporalTableJoin( + cluster, + traitSet, + left, + right, + conditionExpr, + joinType) + } + + //~ ExecNode methods --- + + override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = { +getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]]) + } + + override def replaceInputNode( +ordinalInParent: Int, +newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = { +replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode]) + } + + override protected def translateToPlanInternal( Review comment: I would
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298453583 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala ## @@ -48,8 +48,18 @@ class TableImpl(val tableEnv: TableEnvironment, operationTree: QueryOperation) e override def select(fields: Expression*): Table = ??? override def createTemporalTableFunction( Review comment: The another way (using Expression parameter) will be supported when we integrate table-common's `TableImpl` and will add some tests after that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298453442 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -488,14 +488,14 @@ abstract class StreamTableEnvironment( } } -fields.zipWithIndex.foreach { - case ("rowtime", idx) => -extractRowtime(idx, "rowtime", None) - - case ("proctime", idx) => -extractProctime(idx, "proctime") - - case (name, _) => fieldNames = name :: fieldNames +fields.zipWithIndex.foreach { case (name, idx) => Review comment: Yes. We hacked this to make the test util to support "o_rowtime" as a rowtime attribute. Currently, we can't invoke rowtime/proctime function in blink-planner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874667#comment-16874667 ] Yun Gao edited comment on FLINK-12852 at 6/28/19 4:45 AM: -- For all the possible methods to solve this problem come to me: # Reserver buffers for exclusive buffers: It is hard to know how much required to reserve for the exclusive buffers, especially it is hard to know how many tasks will be scheduled to this TM. # Make the required buffers and the max buffers the same for all the local buffer pools: It may cause previous running jobs unable to run due to the total buffers is less than the sum of the updated required buffers. # Postpone the acquirement of exclusive buffers: It does not solves the deadlock problem, since the downstream still may not acquired any buffers to make progress. # Add a timeout for the _requestMemorySegments._ Therefore, I think currently we need to use the last method. I'd like to # Add an option for the timeout of requestMemorySegment for each channel. The default timeout is 30s. This option will be marked as undocumented since it may be removed within the future implementation. # Transfer the timeout to NetworkBufferPool. # RequestMemorySegments will throw IOException("Insufficient buffer") if not all segments acquired after timeout. was (Author: gaoyunhaii): For all the possible methods to solve this problem come to me: # Reserver buffers for exclusive buffers: It is hard to know how much required to reserve for the exclusive buffers, especially it is hard to know how many tasks will be scheduled to this TM. # Make the required buffers and the max buffers the same: It may cause previous running jobs unable to run due to the total buffers is less than the sum of the updated required buffers. # Postpone the acquirement of exclusive buffers: It does not solves the deadlock problem, since the downstream still may not acquired any buffers to make progress. # Add a timeout for the _requestMemorySegments._ Therefore, I think currently we need to use the last method. I'd like to # Add an option for the timeout of requestMemorySegment for each channel. The default timeout is 30s. This option will be marked as undocumented since it may be removed within the future implementation. # Transfer the timeout to NetworkBufferPool. # RequestMemorySegments will throw IOException("Insufficient buffer") if not all segments acquired after timeout. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 20
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874667#comment-16874667 ] Yun Gao commented on FLINK-12852: - For all the possible methods to solve this problem come to me: # Reserver buffers for exclusive buffers: It is hard to know how much required to reserve for the exclusive buffers, especially it is hard to know how many tasks will be scheduled to this TM. # Make the required buffers and the max buffers the same: It may cause previous running jobs unable to run due to the total buffers is less than the sum of the updated required buffers. # Postpone the acquirement of exclusive buffers: It does not solves the deadlock problem, since the downstream still may not acquired any buffers to make progress. # Add a timeout for the _requestMemorySegments._ Therefore, __ I think __ currently we need to use the last method. I'd like to # Add an option for the timeout of requestMemorySegment for each channel. The default timeout is 30s. This option will be marked as undocumented since it may be removed within the future implementation. # Transfer the timeout to NetworkBufferPool. # RequestMemorySegments will throw IOException("Insufficient buffer") if not all segments acquired after timeout. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874667#comment-16874667 ] Yun Gao edited comment on FLINK-12852 at 6/28/19 4:38 AM: -- For all the possible methods to solve this problem come to me: # Reserver buffers for exclusive buffers: It is hard to know how much required to reserve for the exclusive buffers, especially it is hard to know how many tasks will be scheduled to this TM. # Make the required buffers and the max buffers the same: It may cause previous running jobs unable to run due to the total buffers is less than the sum of the updated required buffers. # Postpone the acquirement of exclusive buffers: It does not solves the deadlock problem, since the downstream still may not acquired any buffers to make progress. # Add a timeout for the _requestMemorySegments._ Therefore, I think currently we need to use the last method. I'd like to # Add an option for the timeout of requestMemorySegment for each channel. The default timeout is 30s. This option will be marked as undocumented since it may be removed within the future implementation. # Transfer the timeout to NetworkBufferPool. # RequestMemorySegments will throw IOException("Insufficient buffer") if not all segments acquired after timeout. was (Author: gaoyunhaii): For all the possible methods to solve this problem come to me: # Reserver buffers for exclusive buffers: It is hard to know how much required to reserve for the exclusive buffers, especially it is hard to know how many tasks will be scheduled to this TM. # Make the required buffers and the max buffers the same: It may cause previous running jobs unable to run due to the total buffers is less than the sum of the updated required buffers. # Postpone the acquirement of exclusive buffers: It does not solves the deadlock problem, since the downstream still may not acquired any buffers to make progress. # Add a timeout for the _requestMemorySegments._ Therefore, __ I think __ currently we need to use the last method. I'd like to # Add an option for the timeout of requestMemorySegment for each channel. The default timeout is 30s. This option will be marked as undocumented since it may be removed within the future implementation. # Transfer the timeout to NetworkBufferPool. # RequestMemorySegments will throw IOException("Insufficient buffer") if not all segments acquired after timeout. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 1
[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506595119 > sorry, should have asked @zentol to help review. > > @lirui-apache seems like I might misunderstood how the stage is run? Shouldn't the profile only recompile and retest flink-connector-hive module? The approach in this PR will recompile flink-connector-hive and all the depended modules. To only recompile flink-connector-hive, we can remove the `-am` option. My understanding is if we remove `-am`, the depended modules will be unavailable (or downloaded somewhere which doesn't reflect changes in a PR) because local maven repo is cleared for each Travis job. @zentol please correct me if I misunderstand. Another alternative is to completely skip the recompile and only run tests against Hive-1.2.1. So that we can get rid of most of the overhead while we're still able to catch the issues with Hive-1.2.1. Drawbacks of this approach is that errors that happen during tests are probably harder to understand than errors that happen during compiling. For example, we'll get `NoClassDefFoundError` instead of `cannot find symbol`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #8821: [FLINK-12922][Table SQL / Planner] Remove method parameter from OperatorCodeGenerator
JingsongLi commented on issue #8821: [FLINK-12922][Table SQL / Planner] Remove method parameter from OperatorCodeGenerator URL: https://github.com/apache/flink/pull/8821#issuecomment-506592750 LGTM +1, @liyafan82 can you squash these commits to a one? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298446897 ## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java ## @@ -73,7 +73,7 @@ public void before() { @Test public void testCreateTable() { - check("CREATE TABLE tbl1 (\n" + + check("CREATE TABLE db.tbl1 (\n" + Review comment: A mistake ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro
[ https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-13009: -- Affects Version/s: 1.8.0 > YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots > throws NPE on Travis > - > > Key: FLINK-13009 > URL: https://issues.apache.org/jira/browse/FLINK-13009 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.8.0 >Reporter: Congxian Qiu(klion26) >Priority: Major > > The test > {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} > throws NPE on Travis. > NPE throws from RMAppAttemptMetrics.java#128, and the following is the code > from hadoop-2.8.3[1] > {code:java} > // Only add in the running containers if this is the active attempt. > 128 RMAppAttempt currentAttempt = rmContext.getRMApps() > 129 .get(attemptId.getApplicationId()).getCurrentAppAttempt(); > {code} > > log [https://api.travis-ci.org/v3/job/550689578/log.txt] > [1] > [https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-12852: Description: When running tests with an upstream vertex and downstream vertex, deadlock occurs when submitting the job: {code:java} "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 nid=0x38845 waiting on condition [0x7f2cbe9fe000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00073ed6b6f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) - locked <0x00073fbc81f0> (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) at org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) at java.lang.Thread.run(Thread.java:834) {code} This is due to the required and max of local buffer pool is not the same and there may be over-allocation, when assignExclusiveSegments there are no available memory. The detail of the scenarios is as follows: The parallelism of both upstream vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces data quickly and each occupy about 990 buffers. Then the DownStream task starts and try to assigning exclusive buffers for 1500 -9 = 1491 InputChannels. It requires 2981 buffers but only 1786 left. Since not all downstream tasks can start, the job will be blocked finally and no buffer can be released, and the deadlock finally occurred. I think although increasing the network memory solves the problem, the deadlock may not be acceptable. Fined grained resource management Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the network memory into the ResourceProfile. was: When running tests with an upstream vertex and downstream vertex, deadlock occurs when submitting the job: {code:java} "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 nid=0x38845 waiting on condition [0x7f2cbe9fe000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00073ed6b6f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) - locked <0x00073fbc81f0> (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) at org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) at java.lang.Thread.run(Thread.java:834) {code} This is due to the required and max of local buffer pool is not the same and there may be over-allocation, when assignExclusiveSegments there are no available memory. The detail of the scenarios is as follows: The parallelism of both upstream vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with local buffe
[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916#discussion_r298435255 ## File path: docs/dev/table/common.zh.md ## @@ -89,6 +89,35 @@ tapiResult.insertInto("outputTable") // execute env.execute() +{% endhighlight %} + + + +{% highlight python %} +# for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment +env = StreamExecutionEnvironment.get_execution_environment() + +# create a TableEnvironment +table_env = StreamTableEnvironment.create(env) + +# register a Table +table_env.register_table("table1", ...) # or +table_env.register_table_source("table2", ...) # or Review comment: delete the "# or" ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916#discussion_r298435312 ## File path: docs/dev/table/common.zh.md ## @@ -392,6 +486,26 @@ val revenue = orders **Note:** The Scala Table API uses Scala Symbols, which start with a single tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala implicits. Make sure to import `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._` in order to use Scala implicit conversions. + + +{% highlight python %} +# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently +table_env = StreamTableEnvironment.create(env) + +# register Orders table + +# scan registered Orders table +orders = table_env.scan("Orders") +# compute revenue for all customers from France +revenue = orders \ +.filter("cCountry === 'FRANCE'") Review comment: Whether add \ after the method filter and groupby ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916#discussion_r298440322 ## File path: docs/dev/table/functions.md ## @@ -316,7 +316,7 @@ value NOT IN (sub-query) - + Review comment: In line 432 "STRING.similar(STRING)" is previous wrong.Can you change it along the way both in this file and corresponding zh.md file?Thank you. STRING.similar(STRING) -> STRING1.similar(STRING2) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916#discussion_r298430168 ## File path: docs/dev/table/common.md ## @@ -89,6 +89,35 @@ tapiResult.insertInto("outputTable") // execute env.execute() +{% endhighlight %} + + + +{% highlight python %} +# for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment +env = StreamExecutionEnvironment.get_execution_environment() + +# create a TableEnvironment +table_env = StreamTableEnvironment.create(env) + +# register a Table +table_env.register_table("table1", ...) # or +table_env.register_table_source("table2", ...) # or Review comment: delete the "# or" ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916#discussion_r298438156 ## File path: docs/dev/table/connect.md ## @@ -170,6 +181,50 @@ tableEnvironment {% endhighlight %} + +{% highlight python %} +table_environment \ +.connect( # declare the external system to connect to + Kafka() Review comment: The codestyle doesn't satisfy E126 continuation line over-indented for hanging indent and some similar problem in other code. Is there a better codestyle? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916#discussion_r298441273 ## File path: docs/dev/table/streaming/query_configuration.md ## @@ -135,6 +160,16 @@ val qConfig: StreamQueryConfig = ??? // set idle state retention time: min = 12 hours, max = 24 hours qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24)) +{% endhighlight %} + + +{% highlight python %} + +q_config = ... Review comment: Add a comment description that the type of variable q_config is "StreamQueryConfig" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916#discussion_r298435022 ## File path: docs/dev/table/common.md ## @@ -392,6 +486,26 @@ val revenue = orders **Note:** The Scala Table API uses Scala Symbols, which start with a single tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala implicits. Make sure to import `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._` in order to use Scala implicit conversions. + + +{% highlight python %} +# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently +table_env = StreamTableEnvironment.create(env) + +# register Orders table + +# scan registered Orders table +orders = table_env.scan("Orders") +# compute revenue for all customers from France +revenue = orders \ +.filter("cCountry === 'FRANCE'") Review comment: Whether add \ after the method filter and groupby ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298442961 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java ## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.temporal; + +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataformat.util.BaseRowUtil; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; + +/** + * This operator works by keeping on the state collection of probe and build records to process + * on next watermark. The idea is that between watermarks we are collecting those elements + * and once we are sure that there will be no updates we emit the correct result and clean up the + * state. + * + * Cleaning up the state drops all of the "old" values from the probe side, where "old" is defined + * as older then the current watermark. Build side is also cleaned up in the similar fashion, + * however we always keep at least one record - the latest one - even if it's past the last + * watermark. + * + * One more trick is how the emitting results and cleaning up is triggered. It is achieved + * by registering timers for the keys. We could register a timer for every probe and build + * side element's event time (when watermark exceeds this timer, that's when we are emitting and/or + * cleaning up the state). However this would cause huge number of registered timers. For example + * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if we + * had received Watermark(10), it would trigger 5 separate timers for the same key. To avoid that + * we always keep only one single registered timer for any given key, registered for the minimal + * value. Upon triggering it, we process all records with event times older then or equal to + * currentWatermark. + */ +public class TemporalRowTimeJoinOperator Review comment: `TemporalRowTimeJoinOperator` logical is a bit complicated, consider add a `TemporalRowTimeJoinOperatorTest`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298440940 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java ## @@ -117,7 +117,9 @@ public void open() throws Exception { @Override public void close() throws Exception { super.close(); - joinCondition.backingJoinCondition.close(); + if (joinCondition != null) { Review comment: Any change to not init `joinCondition`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298440480 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/TemporalUdtfJoinTest.scala ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.stream.sql.join + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase} +import org.hamcrest.Matchers.containsString +import org.junit.Test + +class TemporalUdtfJoinTest extends TableTestBase { Review comment: About udtf, There is no user define, maybe use table function instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298442473 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java ## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.temporal; + +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataformat.util.BaseRowUtil; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; + +/** + * This operator works by keeping on the state collection of probe and build records to process + * on next watermark. The idea is that between watermarks we are collecting those elements + * and once we are sure that there will be no updates we emit the correct result and clean up the + * state. + * + * Cleaning up the state drops all of the "old" values from the probe side, where "old" is defined + * as older then the current watermark. Build side is also cleaned up in the similar fashion, + * however we always keep at least one record - the latest one - even if it's past the last + * watermark. + * + * One more trick is how the emitting results and cleaning up is triggered. It is achieved + * by registering timers for the keys. We could register a timer for every probe and build + * side element's event time (when watermark exceeds this timer, that's when we are emitting and/or + * cleaning up the state). However this would cause huge number of registered timers. For example + * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if we + * had received Watermark(10), it would trigger 5 separate timers for the same key. To avoid that + * we always keep only one single registered timer for any given key, registered for the minimal + * value. Upon triggering it, we process all records with event times older then or equal to + * currentWatermark. + */ +public class TemporalRowTimeJoinOperator Review comment: It seems that `TemporalRowTimeJoinOperator` does not deal with `RetentionTime`, maybe you should add a TODO or implement retention mechanism. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298441480 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator +import org.apache.flink.streaming.api.transformations.{StreamTransformation, TwoInputTransformation} +import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, TableException, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, isRowtimeIndicatorType} +import org.apache.flink.table.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator} +import org.apache.flink.table.dataformat.BaseRow +import org.apache.flink.table.generated.GeneratedJoinCondition +import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin +import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode} +import org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION +import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, RelExplainUtil, TemporalJoinUtil} +import org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, TemporalRowTimeJoinOperator} +import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector +import org.apache.flink.table.types.logical.RowType +import org.apache.flink.table.typeutils.BaseRowTypeInfo +import org.apache.flink.util.Preconditions.checkState +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType} +import org.apache.calcite.rex._ + +import java.util + +import scala.collection.JavaConversions._ + +class StreamExecTemporalTableJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftRel: RelNode, +rightRel: RelNode, +condition: RexNode, +joinType: JoinRelType) + extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, joinType) + with StreamPhysicalRel + with StreamExecNode[BaseRow] { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = { +val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder) + +var rowtimeJoin: Boolean = false +val visitor = new RexVisitorImpl[Unit](true) { + override def visitCall(call: RexCall): Unit = { +if (call.getOperator == TEMPORAL_JOIN_CONDITION) { + rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call) +} else { + call.getOperands.foreach(node => node.accept(this)) +} + } +} +nonEquiJoinRex.accept(visitor) +rowtimeJoin + } + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { +new StreamExecTemporalTableJoin( + cluster, + traitSet, + left, + right, + conditionExpr, + joinType) + } + + //~ ExecNode methods --- + + override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = { +getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]]) + } + + override def replaceInputNode( +ordinalInParent: Int, +newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = { +replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode]) + } + + override protected def translateToPlanInternal( +tableEnv: StreamTable
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298442640 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java ## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.temporal; + +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataformat.util.BaseRowUtil; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; + +/** + * This operator works by keeping on the state collection of probe and build records to process + * on next watermark. The idea is that between watermarks we are collecting those elements + * and once we are sure that there will be no updates we emit the correct result and clean up the + * state. + * + * Cleaning up the state drops all of the "old" values from the probe side, where "old" is defined + * as older then the current watermark. Build side is also cleaned up in the similar fashion, + * however we always keep at least one record - the latest one - even if it's past the last + * watermark. + * + * One more trick is how the emitting results and cleaning up is triggered. It is achieved + * by registering timers for the keys. We could register a timer for every probe and build + * side element's event time (when watermark exceeds this timer, that's when we are emitting and/or + * cleaning up the state). However this would cause huge number of registered timers. For example + * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if we + * had received Watermark(10), it would trigger 5 separate timers for the same key. To avoid that + * we always keep only one single registered timer for any given key, registered for the minimal + * value. Upon triggering it, we process all records with event times older then or equal to + * currentWatermark. + */ +public class TemporalRowTimeJoinOperator + extends AbstractStreamOperator + implements TwoInputStreamOperator, Triggerable { + + private static final long serialVersionUID = 6642514795175288193L; + + private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index"; + private static final String LEFT_STATE_NAME = "left"; + private static final String RIGHT_STATE_NAME = "right"; + private static final String REGISTERED_TIMER_STATE_NAME = "timer"; + private static final String TIMERS_STATE_NAME = "timers"; + + private final BaseRowTypeInfo leftType; + private final BaseRowTypeInfo rightType; + private final Generat
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298441284 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.temporal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.table.dataformat.BaseRow; + +import java.io.IOException; +import java.util.Optional; + +/** + * An abstract {@link TwoInputStreamOperator} that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * {@code minRetentionTime} and {@code maxRetentionTime}. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + * + * IMPORTANT NOTE TO USERS: When extending this class, do not use processing time + * timers in your business logic. The reason is that: + * + * 1) if your timers collide with clean up timers and you delete them, then state + * clean-up will not be performed, and + * + * 2) (this one is the reason why this class does not allow to override the onProcessingTime()) Review comment: Why not let `onProcessingTime` final? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
bowenli86 commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506587670 sorry, should have asked @zentol to help review. @lirui-apache seems like I might misunderstood how the stage is run? Shouldn't the profile only recompile and retest flink-connector-hive module? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10701) Move modern kafka connector module into connector profile
[ https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874616#comment-16874616 ] vinoyang edited comment on FLINK-10701 at 6/28/19 3:18 AM: --- Hi [~Zentol] I have seen a separated test profile named "kafka/gelly" in Travis. It seems this issue is meaningless, right? was (Author: yanghua): Hi [~Zentol] I have seen a separated test profile in Travis. It seems this issue is meaningless, right? > Move modern kafka connector module into connector profile > -- > > Key: FLINK-10701 > URL: https://issues.apache.org/jira/browse/FLINK-10701 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka, Tests >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The modern connector is run in the {{misc}} profile since it wasn't properly > added to the {{connector profile in stage.sh click > [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to > see more details.}} > *This issue is blocked by FLINK-10603.* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10701) Move modern kafka connector module into connector profile
[ https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874616#comment-16874616 ] vinoyang commented on FLINK-10701: -- Hi [~Zentol] I have seen a separated test profile in Travis. It seems this issue is meaningless, right? > Move modern kafka connector module into connector profile > -- > > Key: FLINK-10701 > URL: https://issues.apache.org/jira/browse/FLINK-10701 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka, Tests >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The modern connector is run in the {{misc}} profile since it wasn't properly > added to the {{connector profile in stage.sh click > [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to > see more details.}} > *This issue is blocked by FLINK-10603.* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512 Can you describe the rule process of LookupJoin and TemporalTableJoin separately? And I strongly suggest you change class name of them, because them really make me confused... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512 Can you describe the rule process of LookupJoin and TemporalTableJoin separately? And I strongly suggest you change class name of them, because them really confused... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512 Can you describe the rule process of LookupJoin and TemporalTableJoin separately? And I strongly suggest you change to class name of them, because them really confused... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512 Can you describe the rule process of LookupJoin and TemporalTableJoin separately? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13025) Elasticsearch 7.x support
[ https://issues.apache.org/jira/browse/FLINK-13025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874614#comment-16874614 ] vinoyang commented on FLINK-13025: -- Thank you for opening this issue [~Keegan-CloudImperium], I was going to open it and try to implement it. Hi [~aljoscha], Have we ever thought about providing an Elasticsearch universal connector like the Kafka universal connector? Is this idea practical? > Elasticsearch 7.x support > - > > Key: FLINK-13025 > URL: https://issues.apache.org/jira/browse/FLINK-13025 > Project: Flink > Issue Type: New Feature > Components: Connectors / ElasticSearch >Affects Versions: 1.8.0 >Reporter: Keegan Standifer >Priority: Major > > Elasticsearch 7.0.0 was released in April of 2019: > [https://www.elastic.co/blog/elasticsearch-7-0-0-released] > The latest elasticsearch connector is > [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298430341 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalUdtfJoinITCase.scala ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink} +import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.mutable + +@RunWith(classOf[Parameterized]) +class TemporalTableFunctionJoinITCase(state: StateBackendMode) Review comment: Why not just `TemporalJoinITCase`? align with flink-planner? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298435721 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.physical.stream + +import org.apache.flink.table.plan.`trait`.FlinkRelDistribution +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical._ +import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin +import org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition +import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.JoinRelType + +import java.util + +class StreamExecTemporalUdtfJoinRule Review comment: Why not use `StreamExecTemporalTableJoinRule`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298429826 ## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java ## @@ -73,7 +73,7 @@ public void before() { @Test public void testCreateTable() { - check("CREATE TABLE tbl1 (\n" + + check("CREATE TABLE db.tbl1 (\n" + Review comment: Why? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298431205 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -488,14 +488,14 @@ abstract class StreamTableEnvironment( } } -fields.zipWithIndex.foreach { - case ("rowtime", idx) => -extractRowtime(idx, "rowtime", None) - - case ("proctime", idx) => -extractProctime(idx, "proctime") - - case (name, _) => fieldNames = name :: fieldNames +fields.zipWithIndex.foreach { case (name, idx) => Review comment: Why we change this? It seems that flink-planner has not this logical? Flink-planner need invoke rowtime/proctime function to define a time attribute? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298436775 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Utilities for temporal table join + */ +object TemporalJoinUtil { + + // + // Temporal TableFunction Join Utilities + // + + /** +* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly defines +* references to rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute. +* The condition is used to mark this is a temporal tablefunction join. +* Later rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute will be +* extracted from the condition. +*/ + val TEMPORAL_JOIN_CONDITION = new SqlFunction( +"__TEMPORAL_JOIN_CONDITION", +SqlKind.OTHER_FUNCTION, +ReturnTypes.BOOLEAN_NOT_NULL, +null, +OperandTypes.or( + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.DATETIME, +OperandTypes.ANY), + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.ANY)), +SqlFunctionCategory.SYSTEM) + + + def isRowtimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 3 + } + + def isProctimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 2 + } + + def makeRowTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightTimeAttribute, + rightPrimaryKeyExpression) + } + + def makeProcTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightPrimaryKeyExpression) + } + + + def containsTemporalJoinCondition(condition: RexNode): Boolean = { Review comment: NIT: use anonymous class? ``` def containsTemporalJoinCondition(condition: RexNode): Boolean = { var hasTemporalJoinCondition: Boolean = false condition.accept(new RexVisitorImpl[Void](true) { override def visitCall(call: RexCall): Void = { if (call.getOperator != TEMPORAL_JOIN_CONDITION) { super.visitCall(call) } else { hasTemporalJoinCondition = true null } } }) hasTemporalJoinCondition } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298430439 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalUdtfJoinITCase.scala ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink} +import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.mutable + +@RunWith(classOf[Parameterized]) +class TemporalTableFunctionJoinITCase(state: StateBackendMode) + extends StreamingWithStateTestBase(state) { + + /** +* Because of nature of the processing time, we can not (or at least it is not that easy) +* validate the result here. Instead of that, here we are just testing whether there are no +* exceptions in a full blown ITCase. Actual correctness is tested in unit tests. +*/ + @Test + def testProcessTimeInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + +val sqlQuery = + """ +|SELECT +| o.amount * r.rate AS amount +|FROM +| Orders AS o, +| LATERAL TABLE (Rates(o.proctime)) AS r +|WHERE r.currency = o.currency +|""".stripMargin + +val ordersData = new mutable.MutableList[(Long, String)] +ordersData.+=((2L, "Euro")) +ordersData.+=((1L, "US Dollar")) +ordersData.+=((50L, "Yen")) +ordersData.+=((3L, "Euro")) +ordersData.+=((5L, "US Dollar")) + +val ratesHistoryData = new mutable.MutableList[(String, Long)] +ratesHistoryData.+=(("US Dollar", 102L)) +ratesHistoryData.+=(("Euro", 114L)) +ratesHistoryData.+=(("Yen", 1L)) +ratesHistoryData.+=(("Euro", 116L)) +ratesHistoryData.+=(("Euro", 119L)) + +val orders = env + .fromCollection(ordersData) + .toTable(tEnv, 'amount, 'currency, 'proctime) +val ratesHistory = env + .fromCollection(ratesHistoryData) + .toTable(tEnv, 'currency, 'rate, 'proctime) + +tEnv.registerTable("Orders", orders) +tEnv.registerTable("RatesHistory", ratesHistory) +tEnv.registerFunction( + "Rates", + ratesHistory.createTemporalTableFunction("proctime", "currency")) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new TestingAppendSink) +env.execute() + } + + @Test + def testEventTimeInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +val sqlQuery = + """ +|SELECT +| o.amount * r.rate AS amount +|FROM +| Orders AS o, +| LATERAL TABLE (Rates(o.rowtime)) AS r +|WHERE r.currency = o.currency +|""".stripMargin + Review comment: remove empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specif
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298431452 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala ## @@ -48,8 +48,18 @@ class TableImpl(val tableEnv: TableEnvironment, operationTree: QueryOperation) e override def select(fields: Expression*): Table = ??? override def createTemporalTableFunction( Review comment: It seems that there is an only way to use temporal join by using Table class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298432231 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator +import org.apache.flink.streaming.api.transformations.{StreamTransformation, TwoInputTransformation} +import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, TableException, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, isRowtimeIndicatorType} +import org.apache.flink.table.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator} +import org.apache.flink.table.dataformat.BaseRow +import org.apache.flink.table.generated.GeneratedJoinCondition +import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin +import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode} +import org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION +import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, RelExplainUtil, TemporalJoinUtil} +import org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, TemporalRowTimeJoinOperator} +import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector +import org.apache.flink.table.types.logical.RowType +import org.apache.flink.table.typeutils.BaseRowTypeInfo +import org.apache.flink.util.Preconditions.checkState +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType} +import org.apache.calcite.rex._ + +import java.util + +import scala.collection.JavaConversions._ + +class StreamExecTemporalTableJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftRel: RelNode, +rightRel: RelNode, +condition: RexNode, +joinType: JoinRelType) + extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, joinType) + with StreamPhysicalRel + with StreamExecNode[BaseRow] { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = { +val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder) + +var rowtimeJoin: Boolean = false +val visitor = new RexVisitorImpl[Unit](true) { + override def visitCall(call: RexCall): Unit = { +if (call.getOperator == TEMPORAL_JOIN_CONDITION) { + rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call) +} else { + call.getOperands.foreach(node => node.accept(this)) +} + } +} +nonEquiJoinRex.accept(visitor) +rowtimeJoin + } + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { +new StreamExecTemporalTableJoin( + cluster, + traitSet, + left, + right, + conditionExpr, + joinType) + } + + //~ ExecNode methods --- + + override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = { +getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]]) + } + + override def replaceInputNode( +ordinalInParent: Int, +newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = { +replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode]) + } + + override protected def translateToPlanInternal( Review comment: Cons
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298435221 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Utilities for temporal table join + */ +object TemporalJoinUtil { + + // + // Temporal TableFunction Join Utilities + // + + /** +* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly defines +* references to rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute. +* The condition is used to mark this is a temporal tablefunction join. +* Later rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute will be +* extracted from the condition. +*/ + val TEMPORAL_JOIN_CONDITION = new SqlFunction( +"__TEMPORAL_JOIN_CONDITION", +SqlKind.OTHER_FUNCTION, +ReturnTypes.BOOLEAN_NOT_NULL, +null, +OperandTypes.or( + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.DATETIME, +OperandTypes.ANY), + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.ANY)), +SqlFunctionCategory.SYSTEM) + Review comment: empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298436393 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Utilities for temporal table join + */ +object TemporalJoinUtil { + + // + // Temporal TableFunction Join Utilities + // + + /** +* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly defines +* references to rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute. +* The condition is used to mark this is a temporal tablefunction join. +* Later rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute will be +* extracted from the condition. +*/ + val TEMPORAL_JOIN_CONDITION = new SqlFunction( +"__TEMPORAL_JOIN_CONDITION", +SqlKind.OTHER_FUNCTION, +ReturnTypes.BOOLEAN_NOT_NULL, +null, +OperandTypes.or( + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.DATETIME, +OperandTypes.ANY), + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.ANY)), +SqlFunctionCategory.SYSTEM) + + + def isRowtimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 3 + } + + def isProctimeCall(call: RexCall): Boolean = { +checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION) +call.getOperands.size() == 2 + } + + def makeRowTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightTimeAttribute, + rightPrimaryKeyExpression) + } + + def makeProcTimeTemporalJoinConditionCall( +rexBuilder: RexBuilder, +leftTimeAttribute: RexNode, +rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightPrimaryKeyExpression) + } + Review comment: empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901#discussion_r298439071 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.physical.stream + +import org.apache.flink.table.plan.`trait`.FlinkRelDistribution +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical._ +import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin +import org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition +import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.JoinRelType + +import java.util + +class StreamExecTemporalUdtfJoinRule + extends RelOptRule( +operand( + classOf[FlinkLogicalJoin], Review comment: I am really confused about `LogicalCorrelateToTemporalTableJoinRule`.. It seems that really TemporalTableJoin keeps `FlinkLogicalJoin` until `StreamExecTemporalUdtfJoinRule`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys
[ https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874613#comment-16874613 ] Jark Wu edited comment on FLINK-12557 at 6/28/19 3:02 AM: -- I think about it again. Maybe it is still not very concise when the connector name is long, for example: "elasticsearch", "postgresql-binlog". {code:java} connector='postgresql-binlog', postgresql-binlog.property-version='1', postgresql-binlog.version='0.10', postgresql-binlog.url='...', postgresql-binlog.username='...', postgresql-binlog.password='...', format='json', json.property-version = '1', json.version='1', json.derive-schema='true', {code} How about making connection properties to be top-level and making other properties (e.g. format) to be structured. For example: {code:java} type='postgresql-binlog', property-version='1', version='0.10', url='...', username='...', password='...', format='json', format.property-version = '1', format.version='1', format.derive-schema='true' {code} What do you think [~danny0405] [~twalthr]? was (Author: jark): I think about it again. Maybe it is still not very concise when the connector name is long, for example: "elasticsearch", "postgresql-binlog". {code:java} connector='postgresql-binlog', postgresql-binlog.property-version='1', postgresql-binlog.version='0.10', postgresql-binlog.url='...', postgresql-binlog.username='...', postgresql-binlog.password='...', format='json', json.property-version = '1', json.version='1', json.derive-schema='true', {code} How about making connection properties to be top-level and making other properties (e.g. format) to be structured. For example: {code:java} type='postgresql-binlog', property-version='1', version='0.10', url='...', username='...', password='...', format='json', format.property-version = '1', format.version='1', format.derive-schema='true' {code} > Unify create table DDL with clause and connector descriptor keys > > > Key: FLINK-12557 > URL: https://issues.apache.org/jira/browse/FLINK-12557 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.8.0 >Reporter: Danny Chan >Assignee: Danny Chan >Priority: Major > Fix For: 1.9.0 > > > The *with* option in table DDL defines the properties needed for specific > connector to create TableSource/Sink. The properties structure for SqlClient > config YAML is defined in [Improvements to the Unified SQL Connector > API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf], > in this design, the properties can be categorized into 4 parts: > > # Top level properties: name, type(source, sink), update-mode ... > # Connector specific properties: connector.type, connector.path ... > # Format properties: format.type, format.fields.1.name ... > # Table schema properties: (can be omitted for DDL) > > This properties structure is reasonable for YAML, but they are not that > concise enough for developers. So there also defines a tool class named > [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67] > to reconstruct the data structure(like TableSchema) from the flat k-v > strings. > > So in order to reduce complexity and keep the KV consistency for DDL with > properties and TableFactory properties, i proposed to simplify the DDL with > properties keys as following (corresponding to above 4 categories): > > # Top level properties: keep same as that in the YAML e.g. connector, > update-mode > # Connector specific properties: start with prefix named the connector type > e.g. for kafka connector, the properties are defined as kafka.k1 = v1, > kafka.k2 = v2 > # Format properties: format.type simplified to format and the others with > prefix of the format name e.g. format = 'json', json.line-delimiter = "\n" > # Table schema properties: omitted. > Here is a demo of creat table DDL: > {code:java} > CREATE TABLE Kafka10SourceTable ( > intField INTEGER, > stringField VARCHAR(128) COMMENT 'User IP address', > longField BIGINT, > rowTimeField TIMESTAMP, > WATERMARK wm01 FOR 'longField' AS BOUNDED WITH DELAY '60' SECOND > ) > COMMENT 'Kafka Source Table of topic user_ip_address' > WITH ( > connector='kafka', > kafka.property-version='1', > kafka.version='0.10', > kafka.topic='test-kafka-topic', > kafka.startup-mode = 'latest-offset' > kafka.specific-offset = 'offset' > format='json' > json.property-version = '1' > json.version='1' > json.derive-schema='true' > ) > {code} -- This message was sent by Atlas
[jira] [Comment Edited] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys
[ https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874613#comment-16874613 ] Jark Wu edited comment on FLINK-12557 at 6/28/19 3:00 AM: -- I think about it again. Maybe it is still not very concise when the connector name is long, for example: "elasticsearch", "postgresql-binlog". {code:java} connector='postgresql-binlog', postgresql-binlog.property-version='1', postgresql-binlog.version='0.10', postgresql-binlog.url='...', postgresql-binlog.username='...', postgresql-binlog.password='...', format='json', json.property-version = '1', json.version='1', json.derive-schema='true', {code} How about making connection properties to be top-level and making other properties (e.g. format) to be structured. For example: {code:java} type='postgresql-binlog', property-version='1', version='0.10', url='...', username='...', password='...', format='json', format.property-version = '1', format.version='1', format.derive-schema='true' {code} was (Author: jark): I think about it again. Maybe it is still not very concise when the connector name is long, for example: "elasticsearch", "postgresql-binlog". {code:java} connector='postgresql-binlog', postgresql-binlog.property-version='1', postgresql-binlog.version='0.10', postgresql-binlog.url='...', postgresql-binlog.username='...', postgresql-binlog.password='...' format='json' json.property-version = '1' json.version='1' json.derive-schema='true' {code} How about making connection properties to be top-level and making other properties (e.g. format) to be structured. For example: {code:java} type='postgresql-binlog', property-version='1', version='0.10', url='...', username='...', password='...' format='json' format.property-version = '1' format.version='1' format.derive-schema='true' {code} > Unify create table DDL with clause and connector descriptor keys > > > Key: FLINK-12557 > URL: https://issues.apache.org/jira/browse/FLINK-12557 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.8.0 >Reporter: Danny Chan >Assignee: Danny Chan >Priority: Major > Fix For: 1.9.0 > > > The *with* option in table DDL defines the properties needed for specific > connector to create TableSource/Sink. The properties structure for SqlClient > config YAML is defined in [Improvements to the Unified SQL Connector > API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf], > in this design, the properties can be categorized into 4 parts: > > # Top level properties: name, type(source, sink), update-mode ... > # Connector specific properties: connector.type, connector.path ... > # Format properties: format.type, format.fields.1.name ... > # Table schema properties: (can be omitted for DDL) > > This properties structure is reasonable for YAML, but they are not that > concise enough for developers. So there also defines a tool class named > [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67] > to reconstruct the data structure(like TableSchema) from the flat k-v > strings. > > So in order to reduce complexity and keep the KV consistency for DDL with > properties and TableFactory properties, i proposed to simplify the DDL with > properties keys as following (corresponding to above 4 categories): > > # Top level properties: keep same as that in the YAML e.g. connector, > update-mode > # Connector specific properties: start with prefix named the connector type > e.g. for kafka connector, the properties are defined as kafka.k1 = v1, > kafka.k2 = v2 > # Format properties: format.type simplified to format and the others with > prefix of the format name e.g. format = 'json', json.line-delimiter = "\n" > # Table schema properties: omitted. > Here is a demo of creat table DDL: > {code:java} > CREATE TABLE Kafka10SourceTable ( > intField INTEGER, > stringField VARCHAR(128) COMMENT 'User IP address', > longField BIGINT, > rowTimeField TIMESTAMP, > WATERMARK wm01 FOR 'longField' AS BOUNDED WITH DELAY '60' SECOND > ) > COMMENT 'Kafka Source Table of topic user_ip_address' > WITH ( > connector='kafka', > kafka.property-version='1', > kafka.version='0.10', > kafka.topic='test-kafka-topic', > kafka.startup-mode = 'latest-offset' > kafka.specific-offset = 'offset' > format='json' > json.property-version = '1' > json.version='1' > json.derive-schema='true' > ) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys
[ https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874613#comment-16874613 ] Jark Wu commented on FLINK-12557: - I think about it again. Maybe it is still not very concise when the connector name is long, for example: "elasticsearch", "postgresql-binlog". {code:java} connector='postgresql-binlog', postgresql-binlog.property-version='1', postgresql-binlog.version='0.10', postgresql-binlog.url='...', postgresql-binlog.username='...', postgresql-binlog.password='...' format='json' json.property-version = '1' json.version='1' json.derive-schema='true' {code} How about making connection properties to be top-level and making other properties (e.g. format) to be structured. For example: {code:java} type='postgresql-binlog', property-version='1', version='0.10', url='...', username='...', password='...' format='json' format.property-version = '1' format.version='1' format.derive-schema='true' {code} > Unify create table DDL with clause and connector descriptor keys > > > Key: FLINK-12557 > URL: https://issues.apache.org/jira/browse/FLINK-12557 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.8.0 >Reporter: Danny Chan >Assignee: Danny Chan >Priority: Major > Fix For: 1.9.0 > > > The *with* option in table DDL defines the properties needed for specific > connector to create TableSource/Sink. The properties structure for SqlClient > config YAML is defined in [Improvements to the Unified SQL Connector > API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf], > in this design, the properties can be categorized into 4 parts: > > # Top level properties: name, type(source, sink), update-mode ... > # Connector specific properties: connector.type, connector.path ... > # Format properties: format.type, format.fields.1.name ... > # Table schema properties: (can be omitted for DDL) > > This properties structure is reasonable for YAML, but they are not that > concise enough for developers. So there also defines a tool class named > [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67] > to reconstruct the data structure(like TableSchema) from the flat k-v > strings. > > So in order to reduce complexity and keep the KV consistency for DDL with > properties and TableFactory properties, i proposed to simplify the DDL with > properties keys as following (corresponding to above 4 categories): > > # Top level properties: keep same as that in the YAML e.g. connector, > update-mode > # Connector specific properties: start with prefix named the connector type > e.g. for kafka connector, the properties are defined as kafka.k1 = v1, > kafka.k2 = v2 > # Format properties: format.type simplified to format and the others with > prefix of the format name e.g. format = 'json', json.line-delimiter = "\n" > # Table schema properties: omitted. > Here is a demo of creat table DDL: > {code:java} > CREATE TABLE Kafka10SourceTable ( > intField INTEGER, > stringField VARCHAR(128) COMMENT 'User IP address', > longField BIGINT, > rowTimeField TIMESTAMP, > WATERMARK wm01 FOR 'longField' AS BOUNDED WITH DELAY '60' SECOND > ) > COMMENT 'Kafka Source Table of topic user_ip_address' > WITH ( > connector='kafka', > kafka.property-version='1', > kafka.version='0.10', > kafka.topic='test-kafka-topic', > kafka.startup-mode = 'latest-offset' > kafka.specific-offset = 'offset' > format='json' > json.property-version = '1' > json.version='1' > json.derive-schema='true' > ) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506580756 > As an alternative, could we combine this with the hadoop 2.4 or scala 2.12 profile? Would you mind explain how to combine it with hadoop 2.4 or scala 2.12? Does it mean the test will run with different hadoop or scala versions? And will the test run for each PR if we do that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506580008 Hi @zentol, the `connector_hive_1` in the build of my own repo took about 13 min and failed, which is expected because we're having some issue with compiling with Hive-1.2.1. So I suppose that's the overhead of the forced recompilation. Once we fix the build, I expect the testing to take another 2~3 min. Of course the testing time will increase as we add more test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser
wuchong commented on issue #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser URL: https://github.com/apache/flink/pull/8850#issuecomment-506579118 LGTM +1 to merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13020) UT Failure: ChainLengthDecreaseTest
[ https://issues.apache.org/jira/browse/FLINK-13020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874602#comment-16874602 ] Yun Tang commented on FLINK-13020: -- I think this is duplicated with FLINK-12916. > UT Failure: ChainLengthDecreaseTest > --- > > Key: FLINK-13020 > URL: https://issues.apache.org/jira/browse/FLINK-13020 > Project: Flink > Issue Type: Improvement >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > {code:java} > 05:47:24.893 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 19.836 s <<< FAILURE! - in > org.apache.flink.test.state.operator.restore.unkeyed.ChainLengthDecreaseTest > 05:47:24.895 [ERROR] testMigrationAndRestore[Migrate Savepoint: > 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainLengthDecreaseTest) > Time elapsed: 1.501 s <<< ERROR! > java.util.concurrent.ExecutionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Task received > cancellation from one of its inputs > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Task received > cancellation from one of its inputs > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs > ... > 05:48:27.736 [ERROR] Errors: > 05:48:27.736 [ERROR] > ChainLengthDecreaseTest>AbstractOperatorRestoreTestBase.testMigrationAndRestore:102->AbstractOperatorRestoreTestBase.migrateJob:138 > » Execution > 05:48:27.736 [INFO] > {code} > https://travis-ci.org/apache/flink/jobs/551053821 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool
gaoyunhaii commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool URL: https://github.com/apache/flink/pull/8841#discussion_r298434963 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java ## @@ -347,20 +365,65 @@ private MultiTaskSlot( CompletableFuture slotContextFuture, @Nullable SlotRequestId allocatedSlotRequestId) { super(slotRequestId, groupId); + Preconditions.checkNotNull(slotContextFuture); this.parent = parent; - this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture); this.allocatedSlotRequestId = allocatedSlotRequestId; this.children = new HashMap<>(16); this.releasingChildren = false; - slotContextFuture.whenComplete( - (SlotContext ignored, Throwable throwable) -> { - if (throwable != null) { - release(throwable); + this.requestedResources = ResourceProfile.EMPTY; + + this.slotContextFuture = slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> { + if (throwable != null) { + // If the underlying resource request fail, currently we fails all the requests to + // simplify the logic. + release(throwable); + throw new CompletionException(throwable); + } + + if (parent == null) { + ResourceProfile allocated = ResourceProfile.EMPTY; + List childrenToEvict = new ArrayList<>(); + + for (TaskSlot slot : children.values()) { + ResourceProfile allocatedIfInclude = allocated.merge(slot.getRequestedResources()); + + if (slotContext.getResourceProfile().isMatching(allocatedIfInclude)) { + allocated = allocatedIfInclude; + } else { + childrenToEvict.add(slot); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Not all requests are fulfilled due to over-allocated, number of requests is {}, " + + "number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, " + + "evicted requests is {},", + children.size(), + childrenToEvict.size(), + slotContext.getResourceProfile(), + allocated, + childrenToEvict); } - }); + + if (childrenToEvict.size() == children.size()) { + // This only happens when we request to RM using the resource profile of a task + // who is belonging to a CoLocationGroup. Similar to dealing with the fail of Review comment: This is because RM always returns a slot whose resource is larger than the requested one, without co-location, there should be at least the one who triggers the request to RM get fulfilled. The comments are enhanced to be more clear. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool
gaoyunhaii commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool URL: https://github.com/apache/flink/pull/8841#discussion_r298434963 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java ## @@ -347,20 +365,65 @@ private MultiTaskSlot( CompletableFuture slotContextFuture, @Nullable SlotRequestId allocatedSlotRequestId) { super(slotRequestId, groupId); + Preconditions.checkNotNull(slotContextFuture); this.parent = parent; - this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture); this.allocatedSlotRequestId = allocatedSlotRequestId; this.children = new HashMap<>(16); this.releasingChildren = false; - slotContextFuture.whenComplete( - (SlotContext ignored, Throwable throwable) -> { - if (throwable != null) { - release(throwable); + this.requestedResources = ResourceProfile.EMPTY; + + this.slotContextFuture = slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> { + if (throwable != null) { + // If the underlying resource request fail, currently we fails all the requests to + // simplify the logic. + release(throwable); + throw new CompletionException(throwable); + } + + if (parent == null) { + ResourceProfile allocated = ResourceProfile.EMPTY; + List childrenToEvict = new ArrayList<>(); + + for (TaskSlot slot : children.values()) { + ResourceProfile allocatedIfInclude = allocated.merge(slot.getRequestedResources()); + + if (slotContext.getResourceProfile().isMatching(allocatedIfInclude)) { + allocated = allocatedIfInclude; + } else { + childrenToEvict.add(slot); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Not all requests are fulfilled due to over-allocated, number of requests is {}, " + + "number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, " + + "evicted requests is {},", + children.size(), + childrenToEvict.size(), + slotContext.getResourceProfile(), + allocated, + childrenToEvict); } - }); + + if (childrenToEvict.size() == children.size()) { + // This only happens when we request to RM using the resource profile of a task + // who is belonging to a CoLocationGroup. Similar to dealing with the fail of Review comment: This is because RM always returns a slot whose resource is larger than the requested one, without co-location, there should be at least the one who triggers the request to RM get fulfilled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
lirui-apache commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#discussion_r298434852 ## File path: tools/travis/stage.sh ## @@ -164,6 +168,9 @@ function get_compile_modules_for_stage() { (${STAGE_TESTS}) echo "-pl $MODULES_TESTS -am" ;; +(${STAGE_CONNECTOR_HIVE_1}) +echo "-pl $MODULES_CONNECTOR_HIVE -am -Phive-1.2.1 clean" Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r298434626 ## File path: docs/_includes/generated/blob_server_configuration.html ## @@ -7,16 +7,16 @@ - -blob.client.socket.timeout -30 -The socket timeout in milliseconds for the blob client. - blob.client.connect.timeout 0 The connection timeout in milliseconds for the blob client. + +blob.client.socket.timeout Review comment: sorry for bringing you trouble here. I also found other person cause the same issue for unrelated html changes. I would double check the root PR who should cause this change, and you might ignore this change in your PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side
[ https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-12171: Affects Version/s: 1.9.0 > The network buffer memory size should not be checked against the heap size on > the TM side > - > > Key: FLINK-12171 > URL: https://issues.apache.org/jira/browse/FLINK-12171 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.0, 1.9.0 > Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the > logic here. > >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently when computing the network buffer memory size on the TM side in > _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or > _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), > the computed network buffer memory size is checked to be less than > `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the > maximum heap memory (namely -Xmx) . > > With the above process, when TM starts, -Xmx is computed in RM or in > _taskmanager.sh_ with (container memory - network buffer memory - managed > memory), thus the above checking implies that the heap memory of the TM must > be larger than the network memory, which seems to be not necessary. > > This may cause TM to use more memory than expected. For example, for a job > who has a large network throughput, uses may configure network memory to 2G. > However, if users want to assign 1G to heap memory, the TM will fail to > start, and user has to allocate at least 2G heap memory (in other words, 4G > in total for the TM instead of 3G) to make the TM runnable. This may cause > resource inefficiency. > > Therefore, I think the network buffer memory size also need to be checked > against the total memory instead of the heap memory on the TM side: > # Checks that networkBufFraction < 1.0. > # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)). > # Compare the network buffer memory with the total memory. > This checking is also consistent with the similar one done on the RM side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create
zhijiangW commented on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create URL: https://github.com/apache/flink/pull/8779#issuecomment-506574514 Thanks for the review @azagrebin ! @zentol could you help double check and merge if no other concerns? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506574054 I'm not sure I fully follow what you mean, in the following, I'm trying to answer your question. Please correct me if I'm not understanding right. - If you mean "the sync checkpoint/savepoint" is timeout, then the `pendingCheckpoint` will be abort, and the `onCompletionPromis` will complete with an exception, the job exit. - Currently, when stopping a job, the task will not exit, if you want the task exit, could use `cancel` instead of `stop` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12994) Improve the buffer processing performance in SpilledBufferOrEventSequence#getNext
[ https://issues.apache.org/jira/browse/FLINK-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) reassigned FLINK-12994: - Assignee: (was: Congxian Qiu(klion26)) > Improve the buffer processing performance in > SpilledBufferOrEventSequence#getNext > - > > Key: FLINK-12994 > URL: https://issues.apache.org/jira/browse/FLINK-12994 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu(klion26) >Priority: Minor > > This is a follow-up issue of FLINK-12536, please see the benchmark there for > more information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser URL: https://github.com/apache/flink/pull/8850#discussion_r298427114 ## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java ## @@ -583,6 +583,25 @@ public void testInvalidUpsertOverwrite() { "OVERWRITE expression is only used with INSERT mode"); } + @Test + public void testCreateViewWithProperty() { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser URL: https://github.com/apache/flink/pull/8850#discussion_r298427036 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateView.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.error.SqlParseException; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlCreate; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.List; + +/** + * CREATE VIEW DDL sql call. + */ +public class SqlCreateView extends SqlCreate implements ExtendedSqlNode { + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_VIEW", SqlKind.CREATE_VIEW); + + private final SqlIdentifier viewName; + private final SqlNodeList fieldList; + private final SqlNode query; + private final SqlCharStringLiteral comment; + + public SqlCreateView( + SqlParserPos pos, + SqlIdentifier viewName, + SqlNodeList fieldList, + SqlNode query, + boolean replace, + SqlCharStringLiteral comment) { + super(OPERATOR, pos, replace, false); + this.viewName = viewName; + this.fieldList = fieldList; + this.query = query; + this.comment = comment; + } + + @Override + public List getOperandList() { + List ops = Lists.newArrayList(); + ops.add(viewName); + ops.add(fieldList); Review comment: Yes, we want to support this. So use can rename the field. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser URL: https://github.com/apache/flink/pull/8850#discussion_r298426924 ## File path: flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl ## @@ -334,6 +317,50 @@ void PartitionSpecCommaList(SqlNodeList list) : } +/** +* Parses a create view or replace existing view statement. +* CREATE [OR REPLACE] VIEW view_name [ (field1, field2 ...) ] AS select_statement +*/ +SqlCreate SqlCreateView(Span s, boolean replace) : { +boolean replaceView = false; +SqlIdentifier viewName; +SqlCharStringLiteral comment = null; +SqlNode query; +SqlNodeList fieldList = SqlNodeList.EMPTY; +} +{ +[ { replaceView = true; } ] Review comment: Yep, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload
stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload URL: https://github.com/apache/flink/pull/8665 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload
stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload URL: https://github.com/apache/flink/pull/8665 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table
flinkbot edited a comment on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table URL: https://github.com/apache/flink/pull/8921#issuecomment-506545126 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @bowenli86 * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @bowenli86 * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @bowenli86 * ✅ 5. Overall code [quality] is good. - Approved by @bowenli86 Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table
bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table URL: https://github.com/apache/flink/pull/8921#issuecomment-506552250 @xuefuz thanks for your contribution! LGTM, will merge once Travis build passes @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#discussion_r298404869 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala ## @@ -68,7 +68,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule( RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -new FunctionCatalog("default_catalog", "default_database")) +new FunctionCatalog(null)) Review comment: @twalthr @dawidwys @JingsongLi are we able to pass in CatalogManager here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#discussion_r298405079 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java ## @@ -116,15 +119,48 @@ public void registerScalarFunction(String name, ScalarFunction function) { } public String[] getUserDefinedFunctions() { - return userFunctions.values().stream() - .map(FunctionDefinition::toString) - .toArray(String[]::new); + List result = new ArrayList<>(); + + // Get functions in catalog + Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(); + try { + result.addAll(catalog.listFunctions(catalogManager.getCurrentDatabase())); + } catch (DatabaseNotExistException e) { + // Ignore since there will always be a current database of the current catalog + } + + // Get built-in functions + result.addAll( + BuiltInFunctionDefinitions.getDefinitions() + .stream() + .map(f -> f.getName()) + .collect(Collectors.toList())); + + return result.stream() + .map(n -> normalizeName(n)) + .collect(Collectors.toList()) + .toArray(new String[0]); } @Override public Optional lookupFunction(String name) { - final FunctionDefinition userCandidate = userFunctions.get(normalizeName(name)); + String functionName = normalizeName(name); + + Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(); + + FunctionDefinition userCandidate = null; + try { + CatalogFunction catalogFunction = catalog.getFunction( + new ObjectPath(catalogManager.getCurrentDatabase(), functionName)); + + // TODO: use FunctionDefintionFactory to initiate a FunctionDefinition from CatalogFunction Review comment: we need FunctionDefintionFactory here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#discussion_r298404869 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala ## @@ -68,7 +68,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule( RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -new FunctionCatalog("default_catalog", "default_database")) +new FunctionCatalog(null)) Review comment: are we able to pass in CatalogManager here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table
flinkbot commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table URL: https://github.com/apache/flink/pull/8921#issuecomment-506545126 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table
xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table URL: https://github.com/apache/flink/pull/8921#issuecomment-506545081 cc: @bowenli86, @terry, @lirui-apache This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13023) Generate HiveTableSource from from a Hive table
[ https://issues.apache.org/jira/browse/FLINK-13023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13023: --- Labels: pull-request-available (was: ) > Generate HiveTableSource from from a Hive table > --- > > Key: FLINK-13023 > URL: https://issues.apache.org/jira/browse/FLINK-13023 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Labels: pull-request-available > > As a followup for FLINK-11480, this adds the conversion from a Hive table to > a table source that's used for data connector writing side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz opened a new pull request #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table
xuefuz opened a new pull request #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table URL: https://github.com/apache/flink/pull/8921 ## What is the purpose of the change Provide implementation of generating HiveTableSource instance from a Hive table, as part of Hive data connector work. Please note that the PR includes changes in PR #8890 ## Brief change log *(for example:)* - Added implementation in HiveTableFactory - Refactored HiveTableSource and HiveTableInputformat classes - Added corresponding tests ## Verifying this change This change added UT tests . ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13025) Elasticsearch 7.x support
Keegan Standifer created FLINK-13025: Summary: Elasticsearch 7.x support Key: FLINK-13025 URL: https://issues.apache.org/jira/browse/FLINK-13025 Project: Flink Issue Type: New Feature Components: Connectors / ElasticSearch Affects Versions: 1.8.0 Reporter: Keegan Standifer Elasticsearch 7.0.0 was released in April of 2019: [https://www.elastic.co/blog/elasticsearch-7-0-0-released] The latest elasticsearch connector is [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sjwiesman commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker
sjwiesman commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker URL: https://github.com/apache/flink/pull/8917#issuecomment-506542610 Gotcha, then +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen
NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen URL: https://github.com/apache/flink/pull/8886#discussion_r298395591 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java ## @@ -22,25 +22,30 @@ import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import java.util.concurrent.atomic.AtomicLong; + /** * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}. */ public class DescriptiveStatisticsHistogram implements org.apache.flink.metrics.Histogram { private final DescriptiveStatistics descriptiveStatistics; + private final AtomicLong elementsSeen = new AtomicLong(); Review comment: that depends on how users use it - usually, there should be >=1 writer thread(s), e.g. from a task and potentially also spawned user-threads, and 1 reader (the metrics reporter) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen
NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen URL: https://github.com/apache/flink/pull/8886#discussion_r298395591 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java ## @@ -22,25 +22,30 @@ import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import java.util.concurrent.atomic.AtomicLong; + /** * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}. */ public class DescriptiveStatisticsHistogram implements org.apache.flink.metrics.Histogram { private final DescriptiveStatistics descriptiveStatistics; + private final AtomicLong elementsSeen = new AtomicLong(); Review comment: that depends on how users use it - usually, there should be >=1 writer thread(s) (tasks) and 1 reader (the metrics reporter) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services