[GitHub] [flink] wuchong commented on issue #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState

2019-06-27 Thread GitBox
wuchong commented on issue #8909: [FLINK-13004][table-runtime-blink] Correct 
the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
URL: https://github.com/apache/flink/pull/8909#issuecomment-506625995
 
 
   Thanks @Myasuka .
   
   LGTM.
   
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-27 Thread GitBox
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix 
instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298473800
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of {@link BoundedData} that writes directly into a File 
Channel.
+ * The readers are simple file channel readers using a simple dedicated buffer 
pool.
+ */
+final class FileChannelBoundedData implements BoundedData {
+
+   private final Path filePath;
+
+   private final FileChannel fileChannel;
+
+   private final ByteBuffer[] headerAndBufferArray;
+
+   private long size;
+
+   private final int memorySegmentSize;
+
+   FileChannelBoundedData(
+   Path filePath,
+   FileChannel fileChannel,
+   int memorySegmentSize) {
+
+   this.filePath = checkNotNull(filePath);
+   this.fileChannel = checkNotNull(fileChannel);
+   this.memorySegmentSize = memorySegmentSize;
+   this.headerAndBufferArray = 
BufferReaderWriterUtil.allocatedWriteBufferArray();
+   }
+
+   @Override
+   public void writeBuffer(Buffer buffer) throws IOException {
+   size += BufferReaderWriterUtil.writeToByteChannel(fileChannel, 
buffer, headerAndBufferArray);
+   }
+
+   @Override
+   public void finishWrite() throws IOException {
+   fileChannel.close();
+   }
+
+   @Override
+   public Reader createReader() throws IOException {
+   checkState(!fileChannel.isOpen());
+
+   final FileChannel fc = FileChannel.open(filePath, 
StandardOpenOption.READ);
+   return new FileBufferReader(fc, memorySegmentSize);
+   }
+
+   @Override
+   public long getSize() {
+   return size;
+   }
+
+   @Override
+   public void close() throws IOException {
+   IOUtils.closeQuietly(fileChannel);
+   Files.delete(filePath);
+   }
+
+   // 

+
+   public static FileChannelBoundedData create(Path filePath, int 
memorySegmentSize) throws IOException {
+   final FileChannel fileChannel = FileChannel.open(
+   filePath, StandardOpenOption.CREATE_NEW, 
StandardOpenOption.WRITE);
+
+   return new FileChannelBoundedData(
+   filePath,
+   fileChannel,
+   memorySegmentSize);
+   }
+
+   // 

+
+   static final class FileBufferReader implements BoundedData.Reader, 
BufferRecycler {
+
+   private static final int NUM_BUFFERS = 2;
+
+   private final FileChannel fileChannel;
+
+   private final ByteBuffer headerBuffer;
+
+   private final ArrayDeque buffers;
+
+   FileBufferReader(FileChannel fileChannel, int bufferSize) {
+   this.fileChannel = checkNotNull(fileChannel);
+   this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+   th

[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298473554
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+
+  def isRowtimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+
+  def containsTemporalJoinCondition(condition: RexNode): Boolean = {
 
 Review comment:
   Sorry, I misunderstood it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298472910
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+
+  def isRowtimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+
+  def containsTemporalJoinCondition(condition: RexNode): Boolean = {
 
 Review comment:
   separate method? I'm just suggesting that you get rid of this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298472375
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical._
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.JoinRelType
+
+import java.util
+
+class StreamExecTemporalUdtfJoinRule
+  extends RelOptRule(
+operand(
+  classOf[FlinkLogicalJoin],
 
 Review comment:
   `LogicalCorrelateToLookupJoinRule` and `LogicalCorrelateToTemporalJoinRule`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-27 Thread GitBox
klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506621740
 
 
   Currently, Flink doesn't' notify task about the expired checkpoint now, 
[FLINK-8871](https://issues.apache.org/jira/browse/FLINK-8871) will fix it. 
After FLINK-8871 has been fixed, we can do better here.
   Thanks for your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on issue #8901: [FLINK-13003][table-planner-blink] Support 
Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506621197
 
 
   > Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?
   > And I strongly suggest you change class name of them, because them really 
make me confused...
   
   FOR SYSTEM_TIME AS OF:
   1. Logical: `LogicalCorrelate` with `LogicalSnapshot`
   2. `LogicalCorrelateToJoinFromTemporalTableRule`
   3. FlinkLogical: `LogicalJoin` with `LogicalSnapshot`
   4. `StreamExecLookupJoinRule`
   5. Physical: `StreamExecLookupJoin`
   
   LATERAL udtf(left.proctime):
   1. Logical: `LogicalCorrelate` with a `TemporalTableFunction`
   2. `LogicalCorrelateToJoinFromTemporalTableFunctionRule`
   3. FlinkLogical: `LogicalJoin` with a special join condition 
`TEMPORAL_JOIN_CONDITION`
   4. `StreamExecTemporalJoinRule`
   5. Physical: `StreamExecTemporalJoin`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-27 Thread GitBox
yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506618006
 
 
   The onCompletionPromis does complete with an exception, if the 
pendingCheckpoint has been abort, but the job **will not** exit (why job will 
exit because of checkpoint timeout) and its tasks will not resume to execute 
because you **don't** notify the syncLatch.   
   For second part, I don't want the tasks to exit. I just want them to unblock 
and resume to execute when the sync checkpoint/savepoint is timeout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-27 Thread GitBox
yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506618006
 
 
   The onCompletionPromis does complete with an exception, if the 
pendingCheckpoint has been abort, but the job **will not** exit (why job will 
exit because of checkpoint timeout) and its tasks will not resume to execute 
because you **don't** notify the syncLatch. For second part, I don't want the 
tasks to exit. I just want them to unblock and resume to execute when the sync 
checkpoint/savepoint is timeout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12921) Flink Job Scheduling

2019-06-27 Thread vim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874714#comment-16874714
 ] 

vim commented on FLINK-12921:
-

[~knaufk]  Thank you for your reply.

> Flink Job Scheduling
> 
>
> Key: FLINK-12921
> URL: https://issues.apache.org/jira/browse/FLINK-12921
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet, API / DataStream
>Affects Versions: 1.8.0
> Environment: flink 1.8.0
> jdk8
>Reporter: vim
>Priority: Major
>
> This is an example from flink-docs:
> Consider a program with a data source, a _MapFunction_, and a 
> _ReduceFunction_. The source and MapFunction are executed with a parallelism 
> of 4, while the ReduceFunction is executed with a parallelism of 3. A 
> pipeline consists of the sequence Source - Map - Reduce. On a cluster with 2 
> TaskManagers with 3 slots each, the program will be executed as described 
> below.
> !https://ci.apache.org/projects/flink/flink-docs-release-1.8/fig/slots.svg!
> But after I tried, I found that it was not like this. My result is 
> TaskManager 1 used 3 slot, but TaskManager 2 just used 1 slot. Who else has 
> tested this example?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298468317
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/TemporalUdtfJoinTest.scala
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.stream.sql.join
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase}
+import org.hamcrest.Matchers.containsString
+import org.junit.Test
+
+class TemporalUdtfJoinTest extends TableTestBase {
 
 Review comment:
   Rename to `TemporalJoinTest` to align with `TemporalJoinITCase`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-27 Thread GitBox
yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506618006
 
 
   The onCompletionPromis does complete with an exception, if the 
pendingCheckpoint has been abort, but the job will not exit and its tasks will 
not resume to execute because you **don't** notify the syncLatch. For second 
part, I don't want the tasks to exit. I just want them to unblock and resume to 
execute when the sync checkpoint/savepoint is timeout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298467837
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical._
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.JoinRelType
+
+import java.util
+
+class StreamExecTemporalUdtfJoinRule
+  extends RelOptRule(
+operand(
+  classOf[FlinkLogicalJoin],
 
 Review comment:
   Both of lookup join and temporal tablefunction join are correlates before 
optimization. We need to convert them to joins in the first step. 
   I renamed them to `LogicalCorrelateToJoinFromTemporalTableRule` and 
`LogicalCorrelateToJoinFromTemporalTableFunctionRule`, what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-27 Thread GitBox
zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle 
environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#issuecomment-506617887
 
 
   @azagrebin @zentol I submitted another hotfix commit for removing 
`IOManager#isProperlyShutDown`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-27 Thread GitBox
yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506618006
 
 
   The onCompletionPromis does complete with an exception, if the 
pendingCheckpoint has been abort, but the job will not exit and its tasks will 
not resume to execute because you **don't** notify the syncLatch. For second 
part, I don't want the tasks to exit. I just want them to unblock and resume to 
execute.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8913: [FLINK-12968][table-common] Preparation for more casting utilities

2019-06-27 Thread GitBox
asfgit closed pull request #8913: [FLINK-12968][table-common] Preparation for 
more casting utilities
URL: https://github.com/apache/flink/pull/8913
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
zentol commented on a change in pull request #8911: [FLINK-12995][hive] Add 
Hive-1.2.1 build to Travis
URL: https://github.com/apache/flink/pull/8911#discussion_r298457662
 
 

 ##
 File path: tools/travis/stage.sh
 ##
 @@ -212,6 +219,9 @@ function get_test_modules_for_stage() {
 (${STAGE_TESTS})
 echo "-pl $modules_tests"
 ;;
+(${STAGE_CONNECTOR_HIVE_1})
+echo "-pl $MODULES_CONNECTOR_HIVE -Phive-1.2.1"
 
 Review comment:
   on a side-note, if you would include clean here you'd only recompile the 
hive module, but it's quite a hack.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to 
Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506606238
 
 
   To integrate it into the hadoop profile, modify the profile of these 
[builds](https://github.com/apache/flink/blob/master/.travis.yml#L119) to 
include the hive version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10701) Move modern kafka connector module into connector profile

2019-06-27 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874685#comment-16874685
 ] 

Chesnay Schepler commented on FLINK-10701:
--

the long-term goal is still to have kafka contained to a single profile, which 
still isn't the case.

> Move modern kafka connector module into connector profile 
> --
>
> Key: FLINK-10701
> URL: https://issues.apache.org/jira/browse/FLINK-10701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The modern connector is run in the {{misc}} profile since it wasn't properly 
> added to the {{connector profile in stage.sh click 
> [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to 
> see more details.}}
> *This issue is blocked by FLINK-10603.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13011) Release the PyFlink into PyPI

2019-06-27 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874684#comment-16874684
 ] 

Dian Fu commented on FLINK-13011:
-

[~sunjincheng121] [~Zentol] I have contacted the owner of 
[https://pypi.org/project/pyflink/] and have got the ownership of this project. 

> Release the PyFlink into PyPI
> -
>
> Key: FLINK-13011
> URL: https://issues.apache.org/jira/browse/FLINK-13011
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Priority: Major
>
> FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
> have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
> PyFlinjk distribution package built by FLINK-12962 to PyPI. 
> https://pypi.org/
> https://packaging.python.org/tutorials/packaging-projects/
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298456137
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java
 ##
 @@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build 
records to process
+ * on next watermark. The idea is that between watermarks we are collecting 
those elements
+ * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+ * state.
+ *
+ * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
+ * watermark.
+ *
+ * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
 
 Review comment:
   I will support state ttl in `TemporalRowTimeJoinOperator` too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298454608
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java
 ##
 @@ -117,7 +117,9 @@ public void open() throws Exception {
@Override
public void close() throws Exception {
super.close();
-   joinCondition.backingJoinCondition.close();
+   if (joinCondition != null) {
 
 Review comment:
   I think we should protect it in each close() method. The compiled instance 
may not be instantiated when close() is invoked (an exception happens before 
instantiation). 
   
   This change is not related to this pull request. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298454129
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+
+  def isRowtimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+
+  def containsTemporalJoinCondition(condition: RexNode): Boolean = {
 
 Review comment:
   Emmm. I think a separate method to check condition is more clean here. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298453939
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.{StreamTransformation, 
TwoInputTransformation}
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, 
isRowtimeIndicatorType}
+import org.apache.flink.table.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.generated.GeneratedJoinCondition
+import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION
+import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, 
RelExplainUtil, TemporalJoinUtil}
+import 
org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, 
TemporalRowTimeJoinOperator}
+import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.util.Preconditions.checkState
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+class StreamExecTemporalTableJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftRel: RelNode,
+rightRel: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, 
joinType)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = {
+val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder)
+
+var rowtimeJoin: Boolean = false
+val visitor = new RexVisitorImpl[Unit](true) {
+  override def visitCall(call: RexCall): Unit = {
+if (call.getOperator == TEMPORAL_JOIN_CONDITION) {
+  rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call)
+} else {
+  call.getOperands.foreach(node => node.accept(this))
+}
+  }
+}
+nonEquiJoinRex.accept(visitor)
+rowtimeJoin
+  }
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+new StreamExecTemporalTableJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  conditionExpr,
+  joinType)
+  }
+
+  //~ ExecNode methods 
---
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override def replaceInputNode(
+ordinalInParent: Int,
+newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = {
+replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+
+  override protected def translateToPlanInternal(
 
 Review comment:
   I would

[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298453583
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
 ##
 @@ -48,8 +48,18 @@ class TableImpl(val tableEnv: TableEnvironment, 
operationTree: QueryOperation) e
   override def select(fields: Expression*): Table = ???
 
   override def createTemporalTableFunction(
 
 Review comment:
   The another way (using Expression parameter) will be supported when we 
integrate table-common's `TableImpl` and will add some tests after that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298453442
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -488,14 +488,14 @@ abstract class StreamTableEnvironment(
   }
 }
 
-fields.zipWithIndex.foreach {
-  case ("rowtime", idx) =>
-extractRowtime(idx, "rowtime", None)
-
-  case ("proctime", idx) =>
-extractProctime(idx, "proctime")
-
-  case (name, _) => fieldNames = name :: fieldNames
+fields.zipWithIndex.foreach { case (name, idx) =>
 
 Review comment:
   Yes. We hacked this to make the test util to support "o_rowtime" as a 
rowtime attribute. Currently, we can't invoke rowtime/proctime function in 
blink-planner. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-27 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874667#comment-16874667
 ] 

Yun Gao edited comment on FLINK-12852 at 6/28/19 4:45 AM:
--

For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same for all the local 
buffer pools: It may cause previous running jobs unable to run due to the total 
buffers is less than the sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, I think currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.


was (Author: gaoyunhaii):
For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, I think currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 20

[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-27 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874667#comment-16874667
 ] 

Yun Gao commented on FLINK-12852:
-

For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, __ I think __ currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-27 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874667#comment-16874667
 ] 

Yun Gao edited comment on FLINK-12852 at 6/28/19 4:38 AM:
--

For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, I think currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.


was (Author: gaoyunhaii):
For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, __ I think __ currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 1

[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506595119
 
 
   > sorry, should have asked @zentol to help review.
   > 
   > @lirui-apache seems like I might misunderstood how the stage is run? 
Shouldn't the profile only recompile and retest flink-connector-hive module?
   
   The approach in this PR will recompile flink-connector-hive and all the 
depended modules. To only recompile flink-connector-hive, we can remove the 
`-am` option. My understanding is if we remove `-am`, the depended modules will 
be unavailable (or downloaded somewhere which doesn't reflect changes in a PR) 
because local maven repo is cleared for each Travis job. @zentol please correct 
me if I misunderstand.
   
   Another alternative is to completely skip the recompile and only run tests 
against Hive-1.2.1. So that we can get rid of most of the overhead while we're 
still able to catch the issues with Hive-1.2.1. Drawbacks of this approach is 
that errors that happen during tests are probably harder to understand than 
errors that happen during compiling. For example, we'll get 
`NoClassDefFoundError` instead of `cannot find symbol`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8821: [FLINK-12922][Table SQL / Planner] Remove method parameter from OperatorCodeGenerator

2019-06-27 Thread GitBox
JingsongLi commented on issue #8821:  [FLINK-12922][Table SQL / Planner] Remove 
method parameter from OperatorCodeGenerator
URL: https://github.com/apache/flink/pull/8821#issuecomment-506592750
 
 
   LGTM +1, @liyafan82 can you squash these commits to a one?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298446897
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##
 @@ -73,7 +73,7 @@ public void before() {
 
@Test
public void testCreateTable() {
-   check("CREATE TABLE tbl1 (\n" +
+   check("CREATE TABLE db.tbl1 (\n" +
 
 Review comment:
   A mistake ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro

2019-06-27 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-13009:
--
Affects Version/s: 1.8.0

> YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
>  throws NPE on Travis
> -
>
> Key: FLINK-13009
> URL: https://issues.apache.org/jira/browse/FLINK-13009
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> The test 
> {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
>  throws NPE on Travis.
> NPE throws from RMAppAttemptMetrics.java#128, and the following is the code 
> from hadoop-2.8.3[1]
> {code:java}
> // Only add in the running containers if this is the active attempt.
> 128   RMAppAttempt currentAttempt = rmContext.getRMApps()
> 129   .get(attemptId.getApplicationId()).getCurrentAppAttempt();
> {code}
>  
> log [https://api.travis-ci.org/v3/job/550689578/log.txt]
> [1] 
> [https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-27 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12852:

Description: 
When running tests with an upstream vertex and downstream vertex, deadlock 
occurs when submitting the job:
{code:java}
"Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
nid=0x38845 waiting on condition [0x7f2cbe9fe000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00073ed6b6f0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
- locked <0x00073fbc81f0> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
at 
org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
at java.lang.Thread.run(Thread.java:834)
{code}
This is due to the required and max of local buffer pool is not the same and 
there may be over-allocation, when assignExclusiveSegments there are no 
available memory.

 

The detail of the scenarios is as follows: The parallelism of both upstream 
vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 9 
upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces data 
quickly and each occupy about 990 buffers. Then the DownStream task starts and 
try to assigning exclusive buffers for 1500 -9 = 1491 InputChannels. It 
requires 2981 buffers but only 1786 left. Since not all downstream tasks can 
start, the job will be blocked finally and no buffer can be released, and the 
deadlock finally occurred.

 

I think although increasing the network memory solves the problem, the deadlock 
may not be acceptable.  Fined grained resource management  Flink-12761 can 
solve this problem, but AFAIK in 1.9 it will not include the network memory 
into the ResourceProfile.

  was:
When running tests with an upstream vertex and downstream vertex, deadlock 
occurs when submitting the job:
{code:java}
"Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
nid=0x38845 waiting on condition [0x7f2cbe9fe000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00073ed6b6f0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
- locked <0x00073fbc81f0> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
at 
org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
at java.lang.Thread.run(Thread.java:834)
{code}
This is due to the required and max of local buffer pool is not the same and 
there may be over-allocation, when assignExclusiveSegments there are no 
available memory.

 

The detail of the scenarios is as follows: The parallelism of both upstream 
vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 9 
upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
local buffe

[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298435255
 
 

 ##
 File path: docs/dev/table/common.zh.md
 ##
 @@ -89,6 +89,35 @@ tapiResult.insertInto("outputTable")
 // execute
 env.execute()
 
+{% endhighlight %}
+
+
+
+{% highlight python %}
+# for batch programs use ExecutionEnvironment instead of 
StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# register a Table
+table_env.register_table("table1", ...)   # or
+table_env.register_table_source("table2", ...) # or
 
 Review comment:
   delete the "# or" ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298435312
 
 

 ##
 File path: docs/dev/table/common.zh.md
 ##
 @@ -392,6 +486,26 @@ val revenue = orders
 
 **Note:** The Scala Table API uses Scala Symbols, which start with a single 
tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala 
implicits. Make sure to import `org.apache.flink.api.scala._` and 
`org.apache.flink.table.api.scala._` in order to use Scala implicit conversions.
 
+
+
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# scan registered Orders table
+orders = table_env.scan("Orders")
+# compute revenue for all customers from France
+revenue = orders \
+.filter("cCountry === 'FRANCE'")
 
 Review comment:
   Whether add \ after the method filter and groupby ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298440322
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -316,7 +316,7 @@ value NOT IN (sub-query)
 
 
 
-
+
 
 Review comment:
   In line 432 "STRING.similar(STRING)" is previous wrong.Can you change it 
along the way both in this file and corresponding zh.md file?Thank you.
   STRING.similar(STRING) -> STRING1.similar(STRING2)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298430168
 
 

 ##
 File path: docs/dev/table/common.md
 ##
 @@ -89,6 +89,35 @@ tapiResult.insertInto("outputTable")
 // execute
 env.execute()
 
+{% endhighlight %}
+
+
+
+{% highlight python %}
+# for batch programs use ExecutionEnvironment instead of 
StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# register a Table
+table_env.register_table("table1", ...)   # or
+table_env.register_table_source("table2", ...) # or
 
 Review comment:
   delete the "# or" ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298438156
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -170,6 +181,50 @@ tableEnvironment
 {% endhighlight %}
 
 
+
+{% highlight python %}
+table_environment \
+.connect(  # declare the external system to connect to
+ Kafka()
 
 Review comment:
   The codestyle doesn't satisfy E126 continuation line over-indented for 
hanging indent and some similar problem in other code. Is there a better 
codestyle?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298441273
 
 

 ##
 File path: docs/dev/table/streaming/query_configuration.md
 ##
 @@ -135,6 +160,16 @@ val qConfig: StreamQueryConfig = ???
 // set idle state retention time: min = 12 hours, max = 24 hours
 qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
+{% endhighlight %}
+
+
+{% highlight python %}
+
+q_config = ...
 
 Review comment:
   Add a comment description that the type of variable q_config is 
"StreamQueryConfig"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298435022
 
 

 ##
 File path: docs/dev/table/common.md
 ##
 @@ -392,6 +486,26 @@ val revenue = orders
 
 **Note:** The Scala Table API uses Scala Symbols, which start with a single 
tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala 
implicits. Make sure to import `org.apache.flink.api.scala._` and 
`org.apache.flink.table.api.scala._` in order to use Scala implicit conversions.
 
+
+
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# scan registered Orders table
+orders = table_env.scan("Orders")
+# compute revenue for all customers from France
+revenue = orders \
+.filter("cCountry === 'FRANCE'")
 
 Review comment:
   Whether add \ after the method filter and groupby ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298442961
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java
 ##
 @@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build 
records to process
+ * on next watermark. The idea is that between watermarks we are collecting 
those elements
+ * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+ * state.
+ *
+ * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
+ * watermark.
+ *
+ * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
 
 Review comment:
   `TemporalRowTimeJoinOperator` logical is a bit complicated, consider add a 
`TemporalRowTimeJoinOperatorTest`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298440940
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java
 ##
 @@ -117,7 +117,9 @@ public void open() throws Exception {
@Override
public void close() throws Exception {
super.close();
-   joinCondition.backingJoinCondition.close();
+   if (joinCondition != null) {
 
 Review comment:
   Any change to not init `joinCondition`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298440480
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/TemporalUdtfJoinTest.scala
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.stream.sql.join
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase}
+import org.hamcrest.Matchers.containsString
+import org.junit.Test
+
+class TemporalUdtfJoinTest extends TableTestBase {
 
 Review comment:
   About udtf, There is no user define, maybe use table function instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298442473
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java
 ##
 @@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build 
records to process
+ * on next watermark. The idea is that between watermarks we are collecting 
those elements
+ * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+ * state.
+ *
+ * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
+ * watermark.
+ *
+ * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
 
 Review comment:
   It seems that `TemporalRowTimeJoinOperator` does not deal with 
`RetentionTime`, maybe you should add a TODO or implement retention mechanism.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298441480
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.{StreamTransformation, 
TwoInputTransformation}
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, 
isRowtimeIndicatorType}
+import org.apache.flink.table.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.generated.GeneratedJoinCondition
+import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION
+import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, 
RelExplainUtil, TemporalJoinUtil}
+import 
org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, 
TemporalRowTimeJoinOperator}
+import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.util.Preconditions.checkState
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+class StreamExecTemporalTableJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftRel: RelNode,
+rightRel: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, 
joinType)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = {
+val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder)
+
+var rowtimeJoin: Boolean = false
+val visitor = new RexVisitorImpl[Unit](true) {
+  override def visitCall(call: RexCall): Unit = {
+if (call.getOperator == TEMPORAL_JOIN_CONDITION) {
+  rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call)
+} else {
+  call.getOperands.foreach(node => node.accept(this))
+}
+  }
+}
+nonEquiJoinRex.accept(visitor)
+rowtimeJoin
+  }
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+new StreamExecTemporalTableJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  conditionExpr,
+  joinType)
+  }
+
+  //~ ExecNode methods 
---
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override def replaceInputNode(
+ordinalInParent: Int,
+newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = {
+replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+
+  override protected def translateToPlanInternal(
+tableEnv: StreamTable

[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298442640
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java
 ##
 @@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build 
records to process
+ * on next watermark. The idea is that between watermarks we are collecting 
those elements
+ * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+ * state.
+ *
+ * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
+ * watermark.
+ *
+ * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
+   extends AbstractStreamOperator
+   implements TwoInputStreamOperator, 
Triggerable {
+
+   private static final long serialVersionUID = 6642514795175288193L;
+
+   private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index";
+   private static final String LEFT_STATE_NAME = "left";
+   private static final String RIGHT_STATE_NAME = "right";
+   private static final String REGISTERED_TIMER_STATE_NAME = "timer";
+   private static final String TIMERS_STATE_NAME = "timers";
+
+   private final BaseRowTypeInfo leftType;
+   private final BaseRowTypeInfo rightType;
+   private final Generat

[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298441284
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
 ##
 @@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * An abstract {@link TwoInputStreamOperator} that allows its subclasses to 
clean
+ * up their state based on a TTL. This TTL should be specified in the provided
+ * {@code minRetentionTime} and {@code maxRetentionTime}.
+ *
+ * For each known key, this operator registers a timer (in processing time) 
to
+ * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+ * state to cleanup and what further action to take.
+ *
+ * This class takes care of maintaining at most one timer per key.
+ *
+ * IMPORTANT NOTE TO USERS: When extending this class, do not use 
processing time
+ * timers in your business logic. The reason is that:
+ *
+ * 1) if your timers collide with clean up timers and you delete them, then 
state
+ * clean-up will not be performed, and
+ *
+ * 2) (this one is the reason why this class does not allow to override the 
onProcessingTime())
 
 Review comment:
   Why not let `onProcessingTime` final?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
bowenli86 commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to 
Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506587670
 
 
   sorry, should have asked @zentol to help review.
   
   @lirui-apache seems like I might misunderstood how the stage is run? 
Shouldn't the profile only recompile and retest flink-connector-hive module?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10701) Move modern kafka connector module into connector profile

2019-06-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874616#comment-16874616
 ] 

vinoyang edited comment on FLINK-10701 at 6/28/19 3:18 AM:
---

Hi [~Zentol] I have seen a separated test profile named "kafka/gelly" in 
Travis. It seems this issue is meaningless, right?


was (Author: yanghua):
Hi [~Zentol] I have seen a separated test profile in Travis. It seems this 
issue is meaningless, right?

> Move modern kafka connector module into connector profile 
> --
>
> Key: FLINK-10701
> URL: https://issues.apache.org/jira/browse/FLINK-10701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The modern connector is run in the {{misc}} profile since it wasn't properly 
> added to the {{connector profile in stage.sh click 
> [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to 
> see more details.}}
> *This issue is blocked by FLINK-10603.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10701) Move modern kafka connector module into connector profile

2019-06-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874616#comment-16874616
 ] 

vinoyang commented on FLINK-10701:
--

Hi [~Zentol] I have seen a separated test profile in Travis. It seems this 
issue is meaningless, right?

> Move modern kafka connector module into connector profile 
> --
>
> Key: FLINK-10701
> URL: https://issues.apache.org/jira/browse/FLINK-10701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The modern connector is run in the {{misc}} profile since it wasn't properly 
> added to the {{connector profile in stage.sh click 
> [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to 
> see more details.}}
> *This issue is blocked by FLINK-10603.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] 
Support Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512
 
 
   Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?
   And I strongly suggest you change class name of them, because them really 
make me confused...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] 
Support Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512
 
 
   Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?
   And I strongly suggest you change class name of them, because them really 
confused...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] 
Support Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512
 
 
   Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?
   And I strongly suggest you change to class name of them, because them really 
confused...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on issue #8901: [FLINK-13003][table-planner-blink] Support 
Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512
 
 
   Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13025) Elasticsearch 7.x support

2019-06-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874614#comment-16874614
 ] 

vinoyang commented on FLINK-13025:
--

Thank you for opening this issue [~Keegan-CloudImperium], I was going to open 
it and try to implement it.
Hi [~aljoscha], Have we ever thought about providing an Elasticsearch universal 
connector like the Kafka universal connector? Is this idea practical?

> Elasticsearch 7.x support
> -
>
> Key: FLINK-13025
> URL: https://issues.apache.org/jira/browse/FLINK-13025
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.8.0
>Reporter: Keegan Standifer
>Priority: Major
>
> Elasticsearch 7.0.0 was released in April of 2019: 
> [https://www.elastic.co/blog/elasticsearch-7-0-0-released]
> The latest elasticsearch connector is 
> [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298430341
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalUdtfJoinITCase.scala
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
+import 
org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class TemporalTableFunctionJoinITCase(state: StateBackendMode)
 
 Review comment:
   Why not just `TemporalJoinITCase`? align with flink-planner?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298435721
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical._
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.JoinRelType
+
+import java.util
+
+class StreamExecTemporalUdtfJoinRule
 
 Review comment:
   Why not use `StreamExecTemporalTableJoinRule`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298429826
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##
 @@ -73,7 +73,7 @@ public void before() {
 
@Test
public void testCreateTable() {
-   check("CREATE TABLE tbl1 (\n" +
+   check("CREATE TABLE db.tbl1 (\n" +
 
 Review comment:
   Why?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298431205
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -488,14 +488,14 @@ abstract class StreamTableEnvironment(
   }
 }
 
-fields.zipWithIndex.foreach {
-  case ("rowtime", idx) =>
-extractRowtime(idx, "rowtime", None)
-
-  case ("proctime", idx) =>
-extractProctime(idx, "proctime")
-
-  case (name, _) => fieldNames = name :: fieldNames
+fields.zipWithIndex.foreach { case (name, idx) =>
 
 Review comment:
   Why we change this? It seems that flink-planner has not this logical? 
Flink-planner need invoke rowtime/proctime function to define a time attribute?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298436775
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+
+  def isRowtimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+
+  def containsTemporalJoinCondition(condition: RexNode): Boolean = {
 
 Review comment:
   NIT: use anonymous class?
   ```
 def containsTemporalJoinCondition(condition: RexNode): Boolean = {
   var hasTemporalJoinCondition: Boolean = false
   condition.accept(new RexVisitorImpl[Void](true) {
 override def visitCall(call: RexCall): Void = {
   if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
 super.visitCall(call)
   } else {
 hasTemporalJoinCondition = true
 null
   }
 }
   })
   hasTemporalJoinCondition
 }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298430439
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalUdtfJoinITCase.scala
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
+import 
org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class TemporalTableFunctionJoinITCase(state: StateBackendMode)
+  extends StreamingWithStateTestBase(state) {
+
+  /**
+* Because of nature of the processing time, we can not (or at least it is 
not that easy)
+* validate the result here. Instead of that, here we are just testing 
whether there are no
+* exceptions in a full blown ITCase. Actual correctness is tested in unit 
tests.
+*/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+val sqlQuery =
+  """
+|SELECT
+|  o.amount * r.rate AS amount
+|FROM
+|  Orders AS o,
+|  LATERAL TABLE (Rates(o.proctime)) AS r
+|WHERE r.currency = o.currency
+|""".stripMargin
+
+val ordersData = new mutable.MutableList[(Long, String)]
+ordersData.+=((2L, "Euro"))
+ordersData.+=((1L, "US Dollar"))
+ordersData.+=((50L, "Yen"))
+ordersData.+=((3L, "Euro"))
+ordersData.+=((5L, "US Dollar"))
+
+val ratesHistoryData = new mutable.MutableList[(String, Long)]
+ratesHistoryData.+=(("US Dollar", 102L))
+ratesHistoryData.+=(("Euro", 114L))
+ratesHistoryData.+=(("Yen", 1L))
+ratesHistoryData.+=(("Euro", 116L))
+ratesHistoryData.+=(("Euro", 119L))
+
+val orders = env
+  .fromCollection(ordersData)
+  .toTable(tEnv, 'amount, 'currency, 'proctime)
+val ratesHistory = env
+  .fromCollection(ratesHistoryData)
+  .toTable(tEnv, 'currency, 'rate, 'proctime)
+
+tEnv.registerTable("Orders", orders)
+tEnv.registerTable("RatesHistory", ratesHistory)
+tEnv.registerFunction(
+  "Rates",
+  ratesHistory.createTemporalTableFunction("proctime", "currency"))
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new TestingAppendSink)
+env.execute()
+  }
+
+  @Test
+  def testEventTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val sqlQuery =
+  """
+|SELECT
+|  o.amount * r.rate AS amount
+|FROM
+|  Orders AS o,
+|  LATERAL TABLE (Rates(o.rowtime)) AS r
+|WHERE r.currency = o.currency
+|""".stripMargin
+
 
 Review comment:
   remove empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specif

[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298431452
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
 ##
 @@ -48,8 +48,18 @@ class TableImpl(val tableEnv: TableEnvironment, 
operationTree: QueryOperation) e
   override def select(fields: Expression*): Table = ???
 
   override def createTemporalTableFunction(
 
 Review comment:
   It seems that there is an only way to use temporal join by using Table class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298432231
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.{StreamTransformation, 
TwoInputTransformation}
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, 
isRowtimeIndicatorType}
+import org.apache.flink.table.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.generated.GeneratedJoinCondition
+import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION
+import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, 
RelExplainUtil, TemporalJoinUtil}
+import 
org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, 
TemporalRowTimeJoinOperator}
+import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.util.Preconditions.checkState
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+class StreamExecTemporalTableJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftRel: RelNode,
+rightRel: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, 
joinType)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = {
+val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder)
+
+var rowtimeJoin: Boolean = false
+val visitor = new RexVisitorImpl[Unit](true) {
+  override def visitCall(call: RexCall): Unit = {
+if (call.getOperator == TEMPORAL_JOIN_CONDITION) {
+  rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call)
+} else {
+  call.getOperands.foreach(node => node.accept(this))
+}
+  }
+}
+nonEquiJoinRex.accept(visitor)
+rowtimeJoin
+  }
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+new StreamExecTemporalTableJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  conditionExpr,
+  joinType)
+  }
+
+  //~ ExecNode methods 
---
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override def replaceInputNode(
+ordinalInParent: Int,
+newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = {
+replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+
+  override protected def translateToPlanInternal(
 
 Review comment:
   Cons

[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298435221
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
 
 Review comment:
   empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298436393
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+
+  def isRowtimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
 
 Review comment:
   empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298439071
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical._
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.JoinRelType
+
+import java.util
+
+class StreamExecTemporalUdtfJoinRule
+  extends RelOptRule(
+operand(
+  classOf[FlinkLogicalJoin],
 
 Review comment:
   I am really confused about `LogicalCorrelateToTemporalTableJoinRule`.. It 
seems that really TemporalTableJoin keeps `FlinkLogicalJoin` until 
`StreamExecTemporalUdtfJoinRule`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874613#comment-16874613
 ] 

Jark Wu edited comment on FLINK-12557 at 6/28/19 3:02 AM:
--

I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...',
format='json',
json.property-version = '1',
json.version='1',
json.derive-schema='true',
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...',
format='json',
format.property-version = '1',
format.version='1',
format.derive-schema='true'
{code}

What do you think [~danny0405] [~twalthr]?



was (Author: jark):
I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...',
format='json',
json.property-version = '1',
json.version='1',
json.derive-schema='true',
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...',
format='json',
format.property-version = '1',
format.version='1',
format.derive-schema='true'
{code}




> Unify create table DDL with clause and connector descriptor keys
> 
>
> Key: FLINK-12557
> URL: https://issues.apache.org/jira/browse/FLINK-12557
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
> Fix For: 1.9.0
>
>
> The *with* option in table DDL defines the properties needed for specific 
> connector to create TableSource/Sink. The properties structure for SqlClient 
> config YAML is defined in [Improvements to the Unified SQL Connector 
> API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  in this design, the properties can be categorized into 4 parts:
>  
>  # Top level properties: name, type(source, sink), update-mode ...
>  # Connector specific properties: connector.type, connector.path ...
>  # Format properties: format.type, format.fields.1.name ...
>  # Table schema properties: (can be omitted for DDL)
>  
> This properties structure is reasonable for YAML, but they are not that 
> concise enough for developers.  So there also defines a tool class named 
> [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67]
>  to reconstruct the data structure(like TableSchema) from the flat k-v 
> strings.
>  
> So in order to reduce complexity and keep the KV consistency for DDL with 
> properties and TableFactory properties, i proposed to simplify the DDL with 
> properties keys as following (corresponding to above 4 categories):
>  
>  # Top level properties: keep same as that in the YAML e.g. connector, 
> update-mode
>  # Connector specific properties: start with prefix named the connector type 
> e.g. for kafka connector, the properties are defined as kafka.k1 = v1, 
> kafka.k2 = v2
>  # Format properties: format.type simplified to format and the others with 
> prefix of the format name e.g. format = 'json', json.line-delimiter = "\n"
>  # Table schema properties: omitted.
> Here is a demo of creat table DDL:
> {code:java}
> CREATE TABLE Kafka10SourceTable (
>   intField INTEGER,
>   stringField VARCHAR(128) COMMENT 'User IP address',
>   longField BIGINT,
>   rowTimeField TIMESTAMP,
>   WATERMARK wm01 FOR  'longField' AS BOUNDED WITH DELAY '60' SECOND
> )
> COMMENT 'Kafka Source Table of topic user_ip_address'
> WITH (
>   connector='kafka',
>   kafka.property-version='1',
>   kafka.version='0.10',
>   kafka.topic='test-kafka-topic',
>   kafka.startup-mode = 'latest-offset'
>   kafka.specific-offset = 'offset'
>   format='json'
>   json.property-version = '1'
>   json.version='1'
>   json.derive-schema='true'
> )
> {code}



--
This message was sent by Atlas

[jira] [Comment Edited] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874613#comment-16874613
 ] 

Jark Wu edited comment on FLINK-12557 at 6/28/19 3:00 AM:
--

I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...',
format='json',
json.property-version = '1',
json.version='1',
json.derive-schema='true',
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...',
format='json',
format.property-version = '1',
format.version='1',
format.derive-schema='true'
{code}





was (Author: jark):
I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...'
format='json'
json.property-version = '1'
json.version='1'
json.derive-schema='true'
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...'
format='json'
format.property-version = '1'
format.version='1'
format.derive-schema='true'
{code}




> Unify create table DDL with clause and connector descriptor keys
> 
>
> Key: FLINK-12557
> URL: https://issues.apache.org/jira/browse/FLINK-12557
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
> Fix For: 1.9.0
>
>
> The *with* option in table DDL defines the properties needed for specific 
> connector to create TableSource/Sink. The properties structure for SqlClient 
> config YAML is defined in [Improvements to the Unified SQL Connector 
> API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  in this design, the properties can be categorized into 4 parts:
>  
>  # Top level properties: name, type(source, sink), update-mode ...
>  # Connector specific properties: connector.type, connector.path ...
>  # Format properties: format.type, format.fields.1.name ...
>  # Table schema properties: (can be omitted for DDL)
>  
> This properties structure is reasonable for YAML, but they are not that 
> concise enough for developers.  So there also defines a tool class named 
> [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67]
>  to reconstruct the data structure(like TableSchema) from the flat k-v 
> strings.
>  
> So in order to reduce complexity and keep the KV consistency for DDL with 
> properties and TableFactory properties, i proposed to simplify the DDL with 
> properties keys as following (corresponding to above 4 categories):
>  
>  # Top level properties: keep same as that in the YAML e.g. connector, 
> update-mode
>  # Connector specific properties: start with prefix named the connector type 
> e.g. for kafka connector, the properties are defined as kafka.k1 = v1, 
> kafka.k2 = v2
>  # Format properties: format.type simplified to format and the others with 
> prefix of the format name e.g. format = 'json', json.line-delimiter = "\n"
>  # Table schema properties: omitted.
> Here is a demo of creat table DDL:
> {code:java}
> CREATE TABLE Kafka10SourceTable (
>   intField INTEGER,
>   stringField VARCHAR(128) COMMENT 'User IP address',
>   longField BIGINT,
>   rowTimeField TIMESTAMP,
>   WATERMARK wm01 FOR  'longField' AS BOUNDED WITH DELAY '60' SECOND
> )
> COMMENT 'Kafka Source Table of topic user_ip_address'
> WITH (
>   connector='kafka',
>   kafka.property-version='1',
>   kafka.version='0.10',
>   kafka.topic='test-kafka-topic',
>   kafka.startup-mode = 'latest-offset'
>   kafka.specific-offset = 'offset'
>   format='json'
>   json.property-version = '1'
>   json.version='1'
>   json.derive-schema='true'
> )
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874613#comment-16874613
 ] 

Jark Wu commented on FLINK-12557:
-

I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...'
format='json'
json.property-version = '1'
json.version='1'
json.derive-schema='true'
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...'
format='json'
format.property-version = '1'
format.version='1'
format.derive-schema='true'
{code}




> Unify create table DDL with clause and connector descriptor keys
> 
>
> Key: FLINK-12557
> URL: https://issues.apache.org/jira/browse/FLINK-12557
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
> Fix For: 1.9.0
>
>
> The *with* option in table DDL defines the properties needed for specific 
> connector to create TableSource/Sink. The properties structure for SqlClient 
> config YAML is defined in [Improvements to the Unified SQL Connector 
> API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  in this design, the properties can be categorized into 4 parts:
>  
>  # Top level properties: name, type(source, sink), update-mode ...
>  # Connector specific properties: connector.type, connector.path ...
>  # Format properties: format.type, format.fields.1.name ...
>  # Table schema properties: (can be omitted for DDL)
>  
> This properties structure is reasonable for YAML, but they are not that 
> concise enough for developers.  So there also defines a tool class named 
> [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67]
>  to reconstruct the data structure(like TableSchema) from the flat k-v 
> strings.
>  
> So in order to reduce complexity and keep the KV consistency for DDL with 
> properties and TableFactory properties, i proposed to simplify the DDL with 
> properties keys as following (corresponding to above 4 categories):
>  
>  # Top level properties: keep same as that in the YAML e.g. connector, 
> update-mode
>  # Connector specific properties: start with prefix named the connector type 
> e.g. for kafka connector, the properties are defined as kafka.k1 = v1, 
> kafka.k2 = v2
>  # Format properties: format.type simplified to format and the others with 
> prefix of the format name e.g. format = 'json', json.line-delimiter = "\n"
>  # Table schema properties: omitted.
> Here is a demo of creat table DDL:
> {code:java}
> CREATE TABLE Kafka10SourceTable (
>   intField INTEGER,
>   stringField VARCHAR(128) COMMENT 'User IP address',
>   longField BIGINT,
>   rowTimeField TIMESTAMP,
>   WATERMARK wm01 FOR  'longField' AS BOUNDED WITH DELAY '60' SECOND
> )
> COMMENT 'Kafka Source Table of topic user_ip_address'
> WITH (
>   connector='kafka',
>   kafka.property-version='1',
>   kafka.version='0.10',
>   kafka.topic='test-kafka-topic',
>   kafka.startup-mode = 'latest-offset'
>   kafka.specific-offset = 'offset'
>   format='json'
>   json.property-version = '1'
>   json.version='1'
>   json.derive-schema='true'
> )
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506580756
 
 
   > As an alternative, could we combine this with the hadoop 2.4 or scala 2.12 
profile?
   
   Would you mind explain how to combine it with hadoop 2.4 or scala 2.12? Does 
it mean the test will run with different hadoop or scala versions? And will the 
test run for each PR if we do that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506580008
 
 
   Hi @zentol, the `connector_hive_1` in the build of my own repo took about 13 
min and failed, which is expected because we're having some issue with 
compiling with Hive-1.2.1. So I suppose that's the overhead of the forced 
recompilation. Once we fix the build, I expect the testing to take another 2~3 
min. Of course the testing time will increase as we add more test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
wuchong commented on issue #8850: [FLINK-12954] Supports create(drop) view 
grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#issuecomment-506579118
 
 
   LGTM
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13020) UT Failure: ChainLengthDecreaseTest

2019-06-27 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874602#comment-16874602
 ] 

Yun Tang commented on FLINK-13020:
--

I think this is duplicated with FLINK-12916.

> UT Failure: ChainLengthDecreaseTest
> ---
>
> Key: FLINK-13020
> URL: https://issues.apache.org/jira/browse/FLINK-13020
> Project: Flink
>  Issue Type: Improvement
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> {code:java}
> 05:47:24.893 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 19.836 s <<< FAILURE! - in 
> org.apache.flink.test.state.operator.restore.unkeyed.ChainLengthDecreaseTest
> 05:47:24.895 [ERROR] testMigrationAndRestore[Migrate Savepoint: 
> 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainLengthDecreaseTest)
>   Time elapsed: 1.501 s  <<< ERROR!
> java.util.concurrent.ExecutionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
> cancellation from one of its inputs
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
> cancellation from one of its inputs
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
> received cancellation from one of its inputs
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
> received cancellation from one of its inputs
> ...
> 05:48:27.736 [ERROR] Errors: 
> 05:48:27.736 [ERROR]   
> ChainLengthDecreaseTest>AbstractOperatorRestoreTestBase.testMigrationAndRestore:102->AbstractOperatorRestoreTestBase.migrateJob:138
>  » Execution
> 05:48:27.736 [INFO] 
> {code}
> https://travis-ci.org/apache/flink/jobs/551053821



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-27 Thread GitBox
gaoyunhaii commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r298434963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##
 @@ -347,20 +365,65 @@ private MultiTaskSlot(
CompletableFuture 
slotContextFuture,
@Nullable SlotRequestId allocatedSlotRequestId) 
{
super(slotRequestId, groupId);
+   Preconditions.checkNotNull(slotContextFuture);
 
this.parent = parent;
-   this.slotContextFuture = 
Preconditions.checkNotNull(slotContextFuture);
this.allocatedSlotRequestId = allocatedSlotRequestId;
 
this.children = new HashMap<>(16);
this.releasingChildren = false;
 
-   slotContextFuture.whenComplete(
-   (SlotContext ignored, Throwable throwable) -> {
-   if (throwable != null) {
-   release(throwable);
+   this.requestedResources = ResourceProfile.EMPTY;
+
+   this.slotContextFuture = 
slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+   if (throwable != null) {
+   // If the underlying resource request 
fail, currently we fails all the requests to
+   // simplify the logic.
+   release(throwable);
+   throw new 
CompletionException(throwable);
+   }
+
+   if (parent == null) {
+   ResourceProfile allocated = 
ResourceProfile.EMPTY;
+   List childrenToEvict = new 
ArrayList<>();
+
+   for (TaskSlot slot : children.values()) 
{
+   ResourceProfile 
allocatedIfInclude = allocated.merge(slot.getRequestedResources());
+
+   if 
(slotContext.getResourceProfile().isMatching(allocatedIfInclude)) {
+   allocated = 
allocatedIfInclude;
+   } else {
+   
childrenToEvict.add(slot);
+   }
+   }
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Not all requests are 
fulfilled due to over-allocated, number of requests is {}, " +
+   
"number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, 
" +
+   
"evicted requests is {},",
+   children.size(),
+   
childrenToEvict.size(),
+   
slotContext.getResourceProfile(),
+   allocated,
+   
childrenToEvict);
}
-   });
+
+   if (childrenToEvict.size() == 
children.size()) {
+   // This only happens when we 
request to RM using the resource profile of a task
+   // who is belonging to a 
CoLocationGroup. Similar to dealing with the fail of
 
 Review comment:
   This is because RM always returns a slot whose resource is larger than the 
requested one, without co-location, there should be at least the one who 
triggers the request to RM get fulfilled. The comments are enhanced to be more 
clear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-27 Thread GitBox
gaoyunhaii commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r298434963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##
 @@ -347,20 +365,65 @@ private MultiTaskSlot(
CompletableFuture 
slotContextFuture,
@Nullable SlotRequestId allocatedSlotRequestId) 
{
super(slotRequestId, groupId);
+   Preconditions.checkNotNull(slotContextFuture);
 
this.parent = parent;
-   this.slotContextFuture = 
Preconditions.checkNotNull(slotContextFuture);
this.allocatedSlotRequestId = allocatedSlotRequestId;
 
this.children = new HashMap<>(16);
this.releasingChildren = false;
 
-   slotContextFuture.whenComplete(
-   (SlotContext ignored, Throwable throwable) -> {
-   if (throwable != null) {
-   release(throwable);
+   this.requestedResources = ResourceProfile.EMPTY;
+
+   this.slotContextFuture = 
slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+   if (throwable != null) {
+   // If the underlying resource request 
fail, currently we fails all the requests to
+   // simplify the logic.
+   release(throwable);
+   throw new 
CompletionException(throwable);
+   }
+
+   if (parent == null) {
+   ResourceProfile allocated = 
ResourceProfile.EMPTY;
+   List childrenToEvict = new 
ArrayList<>();
+
+   for (TaskSlot slot : children.values()) 
{
+   ResourceProfile 
allocatedIfInclude = allocated.merge(slot.getRequestedResources());
+
+   if 
(slotContext.getResourceProfile().isMatching(allocatedIfInclude)) {
+   allocated = 
allocatedIfInclude;
+   } else {
+   
childrenToEvict.add(slot);
+   }
+   }
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Not all requests are 
fulfilled due to over-allocated, number of requests is {}, " +
+   
"number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, 
" +
+   
"evicted requests is {},",
+   children.size(),
+   
childrenToEvict.size(),
+   
slotContext.getResourceProfile(),
+   allocated,
+   
childrenToEvict);
}
-   });
+
+   if (childrenToEvict.size() == 
children.size()) {
+   // This only happens when we 
request to RM using the resource profile of a task
+   // who is belonging to a 
CoLocationGroup. Similar to dealing with the fail of
 
 Review comment:
   This is because RM always returns a slot whose resource is larger than the 
requested one, without co-location, there should be at least the one who 
triggers the request to RM get fulfilled. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on a change in pull request #8911: [FLINK-12995][hive] 
Add Hive-1.2.1 build to Travis
URL: https://github.com/apache/flink/pull/8911#discussion_r298434852
 
 

 ##
 File path: tools/travis/stage.sh
 ##
 @@ -164,6 +168,9 @@ function get_compile_modules_for_stage() {
 (${STAGE_TESTS})
 echo "-pl $MODULES_TESTS -am"
 ;;
+(${STAGE_CONNECTOR_HIVE_1})
+echo "-pl $MODULES_CONNECTOR_HIVE -am -Phive-1.2.1 clean"
 
 Review comment:
   Yes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-27 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298434626
 
 

 ##
 File path: docs/_includes/generated/blob_server_configuration.html
 ##
 @@ -7,16 +7,16 @@
 
 
 
-
-blob.client.socket.timeout
-30
-The socket timeout in milliseconds for the blob client.
-
 
 blob.client.connect.timeout
 0
 The connection timeout in milliseconds for the blob 
client.
 
+
+blob.client.socket.timeout
 
 Review comment:
   sorry for bringing you trouble here. I also found other person cause the 
same issue for unrelated html changes. I would double check the root PR who 
should cause this change, and you might ignore this change in your PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side

2019-06-27 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12171:

Affects Version/s: 1.9.0

> The network buffer memory size should not be checked against the heap size on 
> the TM side
> -
>
> Key: FLINK-12171
> URL: https://issues.apache.org/jira/browse/FLINK-12171
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.0, 1.9.0
> Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the 
> logic here.
>  
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently when computing the network buffer memory size on the TM side in 
> _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or 
> _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), 
> the computed network buffer memory size is checked to be less than 
> `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the 
> maximum heap memory (namely -Xmx) .
>  
> With the above process, when TM starts, -Xmx is computed in RM or in 
> _taskmanager.sh_ with (container memory - network buffer memory - managed 
> memory),  thus the above checking implies that the heap memory of the TM must 
> be larger than the network memory, which seems to be not necessary.
>  
> This may cause TM to use more memory than expected. For example, for a job 
> who has a large network throughput, uses may configure network memory to 2G. 
> However, if users want to assign 1G to heap memory, the TM will fail to 
> start, and user has to allocate at least 2G heap memory (in other words, 4G 
> in total for the TM instead of 3G) to make the TM runnable. This may cause 
> resource inefficiency.
>  
> Therefore, I think the network buffer memory size also need to be checked 
> against the total memory instead of the heap memory on the TM  side:
>  # Checks that networkBufFraction < 1.0.
>  # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)).
>  # Compare the network buffer memory with the total memory.
> This checking is also consistent with the similar one done on the RM side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create

2019-06-27 Thread GitBox
zhijiangW commented on issue #8779: [FLINK-12882][network] Remove 
ExecutionAttemptID argument from ResultPartitionFactory#create
URL: https://github.com/apache/flink/pull/8779#issuecomment-506574514
 
 
   Thanks for the review @azagrebin !
   @zentol could you help double check and merge if no other concerns?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-27 Thread GitBox
klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506574054
 
 
   I'm not sure I fully follow what you mean, in the following, I'm trying to 
answer your question. Please correct me if I'm not understanding right.
   - If you mean "the sync checkpoint/savepoint" is timeout, then the 
`pendingCheckpoint` will be abort, and the `onCompletionPromis` will complete 
with an exception, the job exit.
   - Currently, when stopping a job, the task will not exit, if you want the 
task exit, could use `cancel` instead of `stop`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12994) Improve the buffer processing performance in SpilledBufferOrEventSequence#getNext

2019-06-27 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) reassigned FLINK-12994:
-

Assignee: (was: Congxian Qiu(klion26))

> Improve the buffer processing performance in 
> SpilledBufferOrEventSequence#getNext
> -
>
> Key: FLINK-12994
> URL: https://issues.apache.org/jira/browse/FLINK-12994
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Congxian Qiu(klion26)
>Priority: Minor
>
> This is a follow-up issue of FLINK-12536, please see the benchmark there for 
> more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298427114
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##
 @@ -583,6 +583,25 @@ public void testInvalidUpsertOverwrite() {
"OVERWRITE expression is only used with INSERT mode");
}
 
+   @Test
+   public void testCreateViewWithProperty() {
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298427036
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateView.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.List;
+
+/**
+ * CREATE VIEW DDL sql call.
+ */
+public class SqlCreateView extends SqlCreate implements ExtendedSqlNode {
+   public static final SqlSpecialOperator OPERATOR = new 
SqlSpecialOperator("CREATE_VIEW", SqlKind.CREATE_VIEW);
+
+   private final SqlIdentifier viewName;
+   private final SqlNodeList fieldList;
+   private final SqlNode query;
+   private final SqlCharStringLiteral comment;
+
+   public SqlCreateView(
+   SqlParserPos pos,
+   SqlIdentifier viewName,
+   SqlNodeList fieldList,
+   SqlNode query,
+   boolean replace,
+   SqlCharStringLiteral comment) {
+   super(OPERATOR, pos, replace, false);
+   this.viewName = viewName;
+   this.fieldList = fieldList;
+   this.query = query;
+   this.comment = comment;
+   }
+
+   @Override
+   public List getOperandList() {
+   List ops = Lists.newArrayList();
+   ops.add(viewName);
+   ops.add(fieldList);
 
 Review comment:
   Yes, we want to support this. So use can rename the field.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298426924
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
 ##
 @@ -334,6 +317,50 @@ void PartitionSpecCommaList(SqlNodeList list) :
 
 }
 
+/**
+* Parses a create view or replace existing view statement.
+*   CREATE [OR REPLACE] VIEW view_name [ (field1, field2 ...) ] AS 
select_statement
+*/
+SqlCreate SqlCreateView(Span s, boolean replace) : {
+boolean replaceView = false;
+SqlIdentifier viewName;
+SqlCharStringLiteral comment = null;
+SqlNode query;
+SqlNodeList fieldList = SqlNodeList.EMPTY;
+}
+{
+[   { replaceView = true; } ]
 
 Review comment:
   Yep, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload

2019-06-27 Thread GitBox
stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include 
the whole stack trace in response payload
URL: https://github.com/apache/flink/pull/8665
 
 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload

2019-06-27 Thread GitBox
stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the 
whole stack trace in response payload
URL: https://github.com/apache/flink/pull/8665
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
flinkbot edited a comment on issue #8921: [FLINK-13023][hive] Generate 
HiveTableSource from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506545126
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @bowenli86
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @bowenli86
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @bowenli86
   * ✅ 5. Overall code [quality] is good.
   - Approved by @bowenli86
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate 
HiveTableSource from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506552250
 
 
   @xuefuz thanks for your contribution!
   
   LGTM, will merge once Travis build passes
   
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-06-27 Thread GitBox
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] 
integrate FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#discussion_r298404869
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 ##
 @@ -68,7 +68,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
   RexProgramExtractor.extractConjunctiveConditions(
 program,
 call.builder().getRexBuilder,
-new FunctionCatalog("default_catalog", "default_database"))
+new FunctionCatalog(null))
 
 Review comment:
   @twalthr @dawidwys @JingsongLi are we able to pass in CatalogManager here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-06-27 Thread GitBox
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] 
integrate FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#discussion_r298405079
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 ##
 @@ -116,15 +119,48 @@ public void registerScalarFunction(String name, 
ScalarFunction function) {
}
 
public String[] getUserDefinedFunctions() {
-   return userFunctions.values().stream()
-   .map(FunctionDefinition::toString)
-   .toArray(String[]::new);
+   List result = new ArrayList<>();
+
+   // Get functions in catalog
+   Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
+   try {
+   
result.addAll(catalog.listFunctions(catalogManager.getCurrentDatabase()));
+   } catch (DatabaseNotExistException e) {
+   // Ignore since there will always be a current database 
of the current catalog
+   }
+
+   // Get built-in functions
+   result.addAll(
+   BuiltInFunctionDefinitions.getDefinitions()
+   .stream()
+   .map(f -> f.getName())
+   .collect(Collectors.toList()));
+
+   return result.stream()
+   .map(n -> normalizeName(n))
+   .collect(Collectors.toList())
+   .toArray(new String[0]);
}
 
@Override
public Optional lookupFunction(String name) {
-   final FunctionDefinition userCandidate = 
userFunctions.get(normalizeName(name));
+   String functionName = normalizeName(name);
+
+   Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
+
+   FunctionDefinition userCandidate = null;
+   try {
+   CatalogFunction catalogFunction = catalog.getFunction(
+   new 
ObjectPath(catalogManager.getCurrentDatabase(), functionName));
+
+   // TODO: use FunctionDefintionFactory to initiate a 
FunctionDefinition from CatalogFunction
 
 Review comment:
   we need FunctionDefintionFactory here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-06-27 Thread GitBox
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] 
integrate FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#discussion_r298404869
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 ##
 @@ -68,7 +68,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
   RexProgramExtractor.extractConjunctiveConditions(
 program,
 call.builder().getRexBuilder,
-new FunctionCatalog("default_catalog", "default_database"))
+new FunctionCatalog(null))
 
 Review comment:
   are we able to pass in CatalogManager here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
flinkbot commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource 
from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506545126
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource 
from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506545081
 
 
   cc: @bowenli86, @terry, @lirui-apache 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13023) Generate HiveTableSource from from a Hive table

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13023:
---
Labels: pull-request-available  (was: )

> Generate HiveTableSource from from a Hive table
> ---
>
> Key: FLINK-13023
> URL: https://issues.apache.org/jira/browse/FLINK-13023
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
>
> As a followup for FLINK-11480, this adds the conversion from a Hive table to 
> a table source that's used for data connector writing side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz opened a new pull request #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
xuefuz opened a new pull request #8921: [FLINK-13023][hive] Generate 
HiveTableSource from from a Hive table
URL: https://github.com/apache/flink/pull/8921
 
 
   
   
   ## What is the purpose of the change
   
   Provide implementation of generating HiveTableSource instance from a Hive 
table, as part of Hive data connector work.
   
   Please note that the PR includes changes in PR #8890 
   
   ## Brief change log
   
   *(for example:)*
 - Added implementation in HiveTableFactory
 - Refactored HiveTableSource and HiveTableInputformat classes
 - Added corresponding tests
   
   ## Verifying this change
   
   This change added UT tests .
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13025) Elasticsearch 7.x support

2019-06-27 Thread Keegan Standifer (JIRA)
Keegan Standifer created FLINK-13025:


 Summary: Elasticsearch 7.x support
 Key: FLINK-13025
 URL: https://issues.apache.org/jira/browse/FLINK-13025
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / ElasticSearch
Affects Versions: 1.8.0
Reporter: Keegan Standifer


Elasticsearch 7.0.0 was released in April of 2019: 
[https://www.elastic.co/blog/elasticsearch-7-0-0-released]
The latest elasticsearch connector is 
[flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sjwiesman commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker

2019-06-27 Thread GitBox
sjwiesman commented on issue #8917: [FLINK-13017][docs] do not mount local 
$HOME into docker
URL: https://github.com/apache/flink/pull/8917#issuecomment-506542610
 
 
   Gotcha, then +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen

2019-06-27 Thread GitBox
NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix 
DescriptiveStatisticsHistogram#getCount() not returning the number of elements 
seen
URL: https://github.com/apache/flink/pull/8886#discussion_r298395591
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
 ##
 @@ -22,25 +22,30 @@
 
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics 
{@link DescriptiveStatistics} as a Flink {@link Histogram}.
  */
 public class DescriptiveStatisticsHistogram implements 
org.apache.flink.metrics.Histogram {
 
private final DescriptiveStatistics descriptiveStatistics;
 
+   private final AtomicLong elementsSeen = new AtomicLong();
 
 Review comment:
   that depends on how users use it - usually, there should be >=1 writer 
thread(s), e.g. from a task and potentially also spawned user-threads, and 1 
reader (the metrics reporter)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen

2019-06-27 Thread GitBox
NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix 
DescriptiveStatisticsHistogram#getCount() not returning the number of elements 
seen
URL: https://github.com/apache/flink/pull/8886#discussion_r298395591
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
 ##
 @@ -22,25 +22,30 @@
 
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics 
{@link DescriptiveStatistics} as a Flink {@link Histogram}.
  */
 public class DescriptiveStatisticsHistogram implements 
org.apache.flink.metrics.Histogram {
 
private final DescriptiveStatistics descriptiveStatistics;
 
+   private final AtomicLong elementsSeen = new AtomicLong();
 
 Review comment:
   that depends on how users use it - usually, there should be >=1 writer 
thread(s) (tasks) and 1 reader (the metrics reporter)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >