[GitHub] [flink] JingsongLi commented on a change in pull request #10027: [FLINK-14549][Table SQL/Planner] Bring more detail by using logicalTy…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10027: [FLINK-14549][Table 
SQL/Planner] Bring more detail by using logicalTy…
URL: https://github.com/apache/flink/pull/10027#discussion_r340437802
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##
 @@ -50,21 +50,24 @@ object TableSinkUtils {
 val srcFieldTypes = query.getTableSchema.getFieldDataTypes
 val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
 
-if (srcFieldTypes.length != sinkFieldTypes.length ||
-  srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
-!PlannerTypeUtils.isInteroperable(
-  fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
+val srcLogicalTypes = srcFieldTypes.map(t => fromDataTypeToLogicalType(t))
+val sinkLogicalTypes = sinkFieldTypes.map(t => 
fromDataTypeToLogicalType(t))
+
+if (srcLogicalTypes.length != sinkLogicalTypes.length ||
+  srcLogicalTypes.zip(sinkLogicalTypes).exists {
+case (srcType, sinkType) =>
+  !PlannerTypeUtils.isInteroperable(srcType, sinkType)
 
 Review comment:
   The difference of `areTypesCompatible` is ignore char precision too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #9993: [FLINK-14498][runtime]Introduce 
NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#issuecomment-546219804
 
 
   
   ## CI report:
   
   * f47a9f60dff6710ce5a7d5fe341a94d0fffb2d6d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133499201)
   * 501e86a6e9e8eab7fc26f030d284268d530e093e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134129471)
   * 3e3a090cfc7d9216701b68664e2c8fa4f34861f7 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on issue #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-29 Thread GitBox
wsry commented on issue #9993: [FLINK-14498][runtime]Introduce 
NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#issuecomment-547740869
 
 
   Thanks @zhijiangW. I have updated the PR and all the concerns are addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14565) Shutdown SystemResourcesCounter on (JM|TM)MetricGroup closed

2019-10-29 Thread Zili Chen (Jira)
Zili Chen created FLINK-14565:
-

 Summary: Shutdown SystemResourcesCounter on (JM|TM)MetricGroup 
closed
 Key: FLINK-14565
 URL: https://issues.apache.org/jira/browse/FLINK-14565
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Reporter: Zili Chen
Assignee: Zili Chen


Currently, we start SystemResourcesCounter when initialize (JM|TM)MetricGroup. 
This thread doesn't exit on (JM|TM)MetricGroup closed and even there is not 
exit logic of them.

It possibly causes thread leak. For example, on our platform which supports 
previewing sample SQL execution, it starts a MiniCluster in the same process as 
the platform. When the preview job finished MiniCluster closed and also 
(JM|TM)MetricGroup. However these SystemResourcesCounter threads remain.

I propose when creating SystemResourcesCounter, track it in (JM|TM)MetricGroup, 
and on (JM|TM)MetricGroup closed, shutdown SystemResourcesCounter. This way, we 
survive from thread leaks.

CC [~chesnay] [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on a change in pull request #10027: [FLINK-14549][Table SQL/Planner] Bring more detail by using logicalTy…

2019-10-29 Thread GitBox
wuchong commented on a change in pull request #10027: [FLINK-14549][Table 
SQL/Planner] Bring more detail by using logicalTy…
URL: https://github.com/apache/flink/pull/10027#discussion_r340429543
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##
 @@ -50,21 +50,24 @@ object TableSinkUtils {
 val srcFieldTypes = query.getTableSchema.getFieldDataTypes
 val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
 
-if (srcFieldTypes.length != sinkFieldTypes.length ||
-  srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
-!PlannerTypeUtils.isInteroperable(
-  fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
+val srcLogicalTypes = srcFieldTypes.map(t => fromDataTypeToLogicalType(t))
+val sinkLogicalTypes = sinkFieldTypes.map(t => 
fromDataTypeToLogicalType(t))
+
+if (srcLogicalTypes.length != sinkLogicalTypes.length ||
+  srcLogicalTypes.zip(sinkLogicalTypes).exists {
+case (srcType, sinkType) =>
+  !PlannerTypeUtils.isInteroperable(srcType, sinkType)
 
 Review comment:
   Is `PlannerTypeUtils.isInteroperable` the same with 
`LogicalTypeChecks.areTypesCompatible`? Could we use `areTypesCompatible` here? 
@JingsongLi 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] buptljy commented on a change in pull request #10037: [FLINK-14561] Don't write FLINK_PLUGINS_DIR env variable to Configuration

2019-10-29 Thread GitBox
buptljy commented on a change in pull request #10037: [FLINK-14561] Don't write 
FLINK_PLUGINS_DIR env variable to Configuration
URL: https://github.com/apache/flink/pull/10037#discussion_r340403417
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
 ##
 @@ -75,12 +77,7 @@ public void configure(ContainerSpecification container) 
throws IOException {
addPathRecursively(flinkBinPath, TARGET_ROOT, container);
addPathRecursively(flinkConfPath, TARGET_ROOT, container);
addPathRecursively(flinkLibPath, TARGET_ROOT, container);
-   if (flinkPluginsPath.isDirectory()) {
-   addPathRecursively(flinkPluginsPath, TARGET_ROOT, 
container);
-   }
-   else {
-   LOG.warn("The plugins directory '" + flinkPluginsPath + 
"' doesn't exist.");
-   }
+   
flinkPluginsPath.ifPresent(FunctionUtils.uncheckedConsumer(pluginsPath -> 
addPathRecursively(pluginsPath, TARGET_ROOT, container)));
 
 Review comment:
   It looks like we change the semantics here. I think `isDirectory` check 
includes both existence check and directory check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10013: [FLINK-13869][table-planner-blink][hive] Hive functions can not work in blink planner stream mode

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10013: 
[FLINK-13869][table-planner-blink][hive] Hive functions can not work in blink 
planner stream mode
URL: https://github.com/apache/flink/pull/10013#discussion_r340428623
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 ##
 @@ -747,7 +747,7 @@ object AggregateUtil extends Enumeration {
   private[flink] def isTableAggregate(aggCalls: util.List[AggregateCall]): 
Boolean = {
 aggCalls
   .filter(e => e.getAggregation.isInstanceOf[AggSqlFunction])
-  .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].makeFunction(null, null))
+  .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].aggregateFunction)
 
 Review comment:
   I think it is very old JIRA... At that time, it is a different root cause, 
but for now, it is a new root cause.
   After add it case to hive, we can ensure the right logic.
   I think I can update JIRA to new root cause. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8194: [FLINK-12216][Runtime]Respect the number of bytes from input parameters in HybridMemorySegment

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #8194: [FLINK-12216][Runtime]Respect the 
number of bytes from input parameters in HybridMemorySegment
URL: https://github.com/apache/flink/pull/8194#issuecomment-545319242
 
 
   
   ## CI report:
   
   * 29a887134fd9564f67c3698d187f36ea9148cf32 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133141926)
   * e96918866fda416260e3291e6719c4fd8420386f : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134127670)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-14539) Unique key metadata should be ketp when using concat or concat_ws in some cases

2019-10-29 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962681#comment-16962681
 ] 

Jark Wu edited comment on FLINK-14539 at 10/30/19 4:34 AM:
---

After rethinking this issue, I think we may can't support derivation primary 
key from concat/concat_ws. For example, if we have a primary key (f0, f1, f2) 
which are all varchar type, say we have two unique records ('a', 'b', 'c') and 
('ab', '', 'c'), but the results of concat(f0, f1, f2) are the same, which 
means the concat value is not primary key anymore. 


was (Author: jark):
After rethinking this issue, I think we may can't support drivation primary key 
from concat/concat_ws. For example, if we have a primary key (f0, f1, f2) which 
are all varchar type, say we have two unique records ('a', 'b', 'c') and ('ab', 
'', 'c'), but the results of concat(f0, f1, f2) are the same, which means the 
concat value is not primary key anymore. 

> Unique key metadata should be ketp when using concat or concat_ws in some 
> cases
> ---
>
> Key: FLINK-14539
> URL: https://issues.apache.org/jira/browse/FLINK-14539
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.9.1
>Reporter: Kevin Zhang
>Assignee: Kevin Zhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently unique key metadata of a project relnode are only kept in the 
> following three situations:
> # project the child unique keys while not changing them
> # cast the child unique key when ignoring nulls and the original type of the 
> field and cast type are the same
> # rename the child unique keys
> Besides these situations, concat and concat_ws should also keep the metadata 
> if they won't break the uniqueness of the child unique keys, i.e. each 
> operands is in one of the above situations, and the operands include all the 
> child unique keys. 
> Say the f0 and f1 are the unique key fields of the child node, the following 
> sqls should keep the unique key metadata 
> {code:sql}
> select concat(f0, f1)
> -- the type of f0 and f1 are both varchar originally and ignore nulls
> select concat(cast(f0 as varchar), f1)
> select cast(concat(f0, f1) as varchar)
> {code}
> while the following sqls should discard the unique key metadata
> {code:sql}
> -- the type of f0 and f1 are both varchar originally
> select concat(cast(f0 as bigint), f1)
> select cast(concat(f0, f1) as bigint)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong edited a comment on issue #10015: [FLINK-14539][Table SQL/Planner]Unique key metadata should be ketp wh…

2019-10-29 Thread GitBox
wuchong edited a comment on issue #10015: [FLINK-14539][Table 
SQL/Planner]Unique key metadata should be ketp wh…
URL: https://github.com/apache/flink/pull/10015#issuecomment-547735177
 
 
   After rethinking this issue, I think we may can't support derivation primary 
key from concat/concat_ws. For example, if we have a primary key (f0, f1, f2) 
which are all varchar type, say we have two unique records ('a', 'b', 'c') and 
('ab', '', 'c'), but the results of concat(f0, f1, f2) are the same, which 
means the concat value is not primary key anymore. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14539) Unique key metadata should be ketp when using concat or concat_ws in some cases

2019-10-29 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962681#comment-16962681
 ] 

Jark Wu commented on FLINK-14539:
-

After rethinking this issue, I think we may can't support drivation primary key 
from concat/concat_ws. For example, if we have a primary key (f0, f1, f2) which 
are all varchar type, say we have two unique records ('a', 'b', 'c') and ('ab', 
'', 'c'), but the results of concat(f0, f1, f2) are the same, which means the 
concat value is not primary key anymore. 

> Unique key metadata should be ketp when using concat or concat_ws in some 
> cases
> ---
>
> Key: FLINK-14539
> URL: https://issues.apache.org/jira/browse/FLINK-14539
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.9.1
>Reporter: Kevin Zhang
>Assignee: Kevin Zhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently unique key metadata of a project relnode are only kept in the 
> following three situations:
> # project the child unique keys while not changing them
> # cast the child unique key when ignoring nulls and the original type of the 
> field and cast type are the same
> # rename the child unique keys
> Besides these situations, concat and concat_ws should also keep the metadata 
> if they won't break the uniqueness of the child unique keys, i.e. each 
> operands is in one of the above situations, and the operands include all the 
> child unique keys. 
> Say the f0 and f1 are the unique key fields of the child node, the following 
> sqls should keep the unique key metadata 
> {code:sql}
> select concat(f0, f1)
> -- the type of f0 and f1 are both varchar originally and ignore nulls
> select concat(cast(f0 as varchar), f1)
> select cast(concat(f0, f1) as varchar)
> {code}
> while the following sqls should discard the unique key metadata
> {code:sql}
> -- the type of f0 and f1 are both varchar originally
> select concat(cast(f0 as bigint), f1)
> select cast(concat(f0, f1) as bigint)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on issue #10015: [FLINK-14539][Table SQL/Planner]Unique key metadata should be ketp wh…

2019-10-29 Thread GitBox
wuchong commented on issue #10015: [FLINK-14539][Table SQL/Planner]Unique key 
metadata should be ketp wh…
URL: https://github.com/apache/flink/pull/10015#issuecomment-547735177
 
 
   After rethinking this issue, I think we may can't support drivation primary 
key from concat/concat_ws. For example, if we have a primary key (f0, f1, f2) 
which are all varchar type, say we have two unique records ('a', 'b', 'c') and 
('ab', '', 'c'), but the results of concat(f0, f1, f2) are the same, which 
means the concat value is not primary key anymore. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340426797
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
 
 Review comment:
   Hi lirui, 
   1. the current implementation is not strange. You can take a look to 
`LocalDate.equals`, it is common to use `compareTo` to implement `equals`.
   2. I mean just use `mills == that.mills && nanos == that.nanos`, it is just 
a suggestion. 
   when will compareTo returns 0 and equals returns false?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12624) Make Flink case-insensitive to meta object names

2019-10-29 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-12624:


Assignee: (was: Bowen Li)

> Make Flink case-insensitive to meta object names
> 
>
> Key: FLINK-12624
> URL: https://issues.apache.org/jira/browse/FLINK-12624
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Bowen Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently Flink is case-sensitive to meta object names. For example, for 
> table names, "t1" and "T1" are two distinct tables in Flink.
> There are two main reasons to make Flink case-insensitive to such names.
> 1) To make it more user-friendly for SQL users. Most popular databases 
> nowadays are case insensitive to meta object names, to tolerate user typos in 
> SQL commands and reduce usage friction
> 2) We chose Hive metastore as the main persistent catalog, and Hive metastore 
> is case insensitive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on issue #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on issue #10029:  [FLINK-14553][runtime] Respect 
non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#issuecomment-547733858
 
 
   Thanks for the review and good suggestions @pnowojski !
   
   Concerning of refactor of `getAvailableFuture()` and `isAvailable()`, I 
explained my thought in inline comments.
   
   I remembered there were some discussions for this issue before. But now I 
find it is not very convenient to judge the available state via checking 
whether the future is completed or not in some cases. Because the returned 
`CompleteFuture` is mainly used for callback action, but if we only want to 
know the current state and do not need the requirement of monitoring the state 
for callback in some cases, then the previous way is heave-weight to some 
extent.

   I agree it would bring some overheads for outside implementations like 
`SourceReader`. But if we can explain the semantics of two methods clearly, 
then they can be orthogonal.
   
   > I think this PR can not be merged until we fix/migrate the back-pressure 
monitor to use the availability future?
   
   Exactly, it seems not much conflicts to prelaunch this PR, so I focus on it 
firstly and the back-pressure monitor would be done by yingjie.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14560) The value of taskmanager.memory.size in flink-conf.yaml is set to zero will cause taskmanager not to work

2019-10-29 Thread fa zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962672#comment-16962672
 ] 

fa zheng commented on FLINK-14560:
--

[~jark], i think it's a little unreasonable.  zero is same as your default 
value in flink, but cause an error.  We find this issue because our hadoop 
system provide a  web to config some important settings when installing flink 
service. taskmanager.memory.size will be set to zero as default according to 
your default value. I think it will be better if we unify the meanning of 0 and 
0b . 

> The value of taskmanager.memory.size in flink-conf.yaml is set to zero will 
> cause taskmanager not to work 
> --
>
> Key: FLINK-14560
> URL: https://issues.apache.org/jira/browse/FLINK-14560
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.9.0, 1.9.1
>Reporter: fa zheng
>Priority: Minor
> Fix For: 1.10.0, 1.9.2
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> If you accidentally  set taskmanager.memory.size: 0 in flink-conf.yaml, flink 
> should take a fixed ratio with respect to the size of the task manager JVM. 
> The relateted codes are in TaskManagerServicesConfiguration.fromConfiguration
> {code:java}
> //代码占位符
> // extract memory settings
> long configuredMemory;
> String managedMemorySizeDefaultVal = 
> TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
> if 
> (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
>  {
>try {
>   configuredMemory = 
> MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
>  MEGA_BYTES).getMebiBytes();
>} catch (IllegalArgumentException e) {
>   throw new IllegalConfigurationException(
>  "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
>}
> } else {
>configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
> }{code}
> However, in FlinkYarnSessionCli.java, flink will translate the value to byte.
> {code:java}
> //代码占位符
> // JobManager Memory
> final int jobManagerMemoryMB = 
> ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
> // Task Managers memory
> final int taskManagerMemoryMB = 
> ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
> {code}
>  
> As a result, 0 will translate to 0 b and is different from default value.  0 
> b will cause a error in following check code
> {code:java}
> //代码占位符
> checkConfigParameter(
>
> configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
>  ||
>   configuredMemory > 0, configuredMemory,
>TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
>"MemoryManager needs at least one MB of memory. " +
>   "If you leave this config parameter empty, the system automatically " +
>   "pick a fraction of the available memory.");
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10038: [FLINK-14375][runtime] Refactor task state updating to only notify scheduler about state changes that really happened

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #10038: [FLINK-14375][runtime] Refactor task 
state updating to only notify scheduler about state changes that really happened
URL: https://github.com/apache/flink/pull/10038#issuecomment-547556449
 
 
   
   ## CI report:
   
   * 6f3d22fe8a4c9beb37e72f69d87f670d3dc2dbb5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134056920)
   * a3b19a3ecd42c994e2f4b095d5091971318835ab : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134124269)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
lirui-apache commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340424741
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
 
 Review comment:
   Why do we have to use `==`? I think it'll be strange if compareTo returns 0 
and equals returns false.
   BTW, I think we should also implement `hashCode`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #9993: [FLINK-14498][runtime]Introduce 
NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#issuecomment-546219804
 
 
   
   ## CI report:
   
   * f47a9f60dff6710ce5a7d5fe341a94d0fffb2d6d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133499201)
   * 501e86a6e9e8eab7fc26f030d284268d530e093e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134129471)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340423539
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -281,10 +289,41 @@ protected void processInput(DefaultActionContext 
context) throws Exception {
if (status == InputStatus.END_OF_INPUT) {
context.allActionsCompleted();
}
-   else if (status == InputStatus.NOTHING_AVAILABLE) {
+
+   CompletableFuture jointFuture = 
getInputOutputJointFuture(status);
+   if (jointFuture != null) {
SuspendedMailboxDefaultAction suspendedDefaultAction = 
context.suspendDefaultAction();
-   
inputProcessor.getAvailableFuture().thenRun(suspendedDefaultAction::resume);
+   jointFuture.thenRun(suspendedDefaultAction::resume);
+   }
+   }
+
+   /**
+* @return a combination of input and output futures if at-least one 
future of them is not
+* completed, otherwise return null if all input and outputs are 
available.
+*/
+   private CompletableFuture getInputOutputJointFuture(InputStatus 
status) {
+   if (status == InputStatus.MORE_AVAILABLE && 
isOutputAvailable()) {
+   return null;
+   }
+
+   int length = recordWriters.size();
+   for (int i = 0; i < length; i++) {
+   inputOutputFutures[i] = 
recordWriters.get(i).getAvailableFuture();
+   }
+   inputOutputFutures[length] = 
inputProcessor.getAvailableFuture();
+   return CompletableFuture.allOf(inputOutputFutures);
+   }
+
+   /**
+* @return true if all the record writers are available.
+*/
+   private boolean isOutputAvailable() {
+   for (RecordWriter recordWriter : recordWriters) {
+   if (!recordWriter.isAvailable()) {
 
 Review comment:
   I ever considered dropping the condition of `isBlocking` before. But 
considering that `LocalBufferPool#isAvailable` is actually not used in input 
side, so I retain this condition to not impact the input side. 
   
   After thinking through it now, it is not very clean because during 
`LocalBufferPool#recycle` we could not distinguish whether it is for input or 
output usages. 
   
   We can remove this condition here and also adjust the interaction between 
`RemoteInputChannel` and `LocalBufferPool` to make use of `isAvailable` way 
instead of `BufferListener` interface. WDYT?
   
   Concerning of the synchronised `recordWriter.isAvailable()`, it is actually 
the same case as other previous usages. The unavailable state is always visible 
for mailbox/task thread because the `LocalBufferPool` might become unavailable 
only after task thread requesting. For the visibility from unavailable to 
available, it is caused by other threads, then it can be still visible for task 
thread via touching volatile variable in `CompleteFuture.complete` as we 
confirmed before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10013: [FLINK-13869][table-planner-blink][hive] Hive functions can not work in blink planner stream mode

2019-10-29 Thread GitBox
wuchong commented on a change in pull request #10013: 
[FLINK-13869][table-planner-blink][hive] Hive functions can not work in blink 
planner stream mode
URL: https://github.com/apache/flink/pull/10013#discussion_r340423132
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 ##
 @@ -747,7 +747,7 @@ object AggregateUtil extends Enumeration {
   private[flink] def isTableAggregate(aggCalls: util.List[AggregateCall]): 
Boolean = {
 aggCalls
   .filter(e => e.getAggregation.isInstanceOf[AggSqlFunction])
-  .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].makeFunction(null, null))
+  .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].aggregateFunction)
 
 Review comment:
   But from the exception stack attched in the JIRA, it seems that this is not 
where the exception throws. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10013: [FLINK-13869][table-planner-blink][hive] Hive functions can not work in blink planner stream mode

2019-10-29 Thread GitBox
wuchong commented on a change in pull request #10013: 
[FLINK-13869][table-planner-blink][hive] Hive functions can not work in blink 
planner stream mode
URL: https://github.com/apache/flink/pull/10013#discussion_r340423132
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 ##
 @@ -747,7 +747,7 @@ object AggregateUtil extends Enumeration {
   private[flink] def isTableAggregate(aggCalls: util.List[AggregateCall]): 
Boolean = {
 aggCalls
   .filter(e => e.getAggregation.isInstanceOf[AggSqlFunction])
-  .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].makeFunction(null, null))
+  .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].aggregateFunction)
 
 Review comment:
   But from the exception stack attached in the JIRA, it seems that this is not 
where the exception throws. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #9993: [FLINK-14498][runtime]Introduce 
NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#issuecomment-546219804
 
 
   
   ## CI report:
   
   * f47a9f60dff6710ce5a7d5fe341a94d0fffb2d6d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133499201)
   * 501e86a6e9e8eab7fc26f030d284268d530e093e : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8194: [FLINK-12216][Runtime]Respect the number of bytes from input parameters in HybridMemorySegment

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #8194: [FLINK-12216][Runtime]Respect the 
number of bytes from input parameters in HybridMemorySegment
URL: https://github.com/apache/flink/pull/8194#issuecomment-545319242
 
 
   
   ## CI report:
   
   * 29a887134fd9564f67c3698d187f36ea9148cf32 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133141926)
   * e96918866fda416260e3291e6719c4fd8420386f : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134127670)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14560) The value of taskmanager.memory.size in flink-conf.yaml is set to zero will cause taskmanager not to work

2019-10-29 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962660#comment-16962660
 ] 

Jark Wu commented on FLINK-14560:
-

Hi [~faaronzheng], I'm wondering it maybe not a bug. As the exception says, if 
you leave the config empty, Flink will automatically pick a ratio. 
However, the config is set to 0 now, and 0 is an invalid value. So I think the 
exception is as expected. 

> The value of taskmanager.memory.size in flink-conf.yaml is set to zero will 
> cause taskmanager not to work 
> --
>
> Key: FLINK-14560
> URL: https://issues.apache.org/jira/browse/FLINK-14560
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.9.0, 1.9.1
>Reporter: fa zheng
>Priority: Minor
> Fix For: 1.10.0, 1.9.2
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> If you accidentally  set taskmanager.memory.size: 0 in flink-conf.yaml, flink 
> should take a fixed ratio with respect to the size of the task manager JVM. 
> The relateted codes are in TaskManagerServicesConfiguration.fromConfiguration
> {code:java}
> //代码占位符
> // extract memory settings
> long configuredMemory;
> String managedMemorySizeDefaultVal = 
> TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
> if 
> (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
>  {
>try {
>   configuredMemory = 
> MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
>  MEGA_BYTES).getMebiBytes();
>} catch (IllegalArgumentException e) {
>   throw new IllegalConfigurationException(
>  "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
>}
> } else {
>configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
> }{code}
> However, in FlinkYarnSessionCli.java, flink will translate the value to byte.
> {code:java}
> //代码占位符
> // JobManager Memory
> final int jobManagerMemoryMB = 
> ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
> // Task Managers memory
> final int taskManagerMemoryMB = 
> ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
> {code}
>  
> As a result, 0 will translate to 0 b and is different from default value.  0 
> b will cause a error in following check code
> {code:java}
> //代码占位符
> checkConfigParameter(
>
> configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
>  ||
>   configuredMemory > 0, configuredMemory,
>TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
>"MemoryManager needs at least one MB of memory. " +
>   "If you leave this config parameter empty, the system automatically " +
>   "pick a fraction of the available memory.");
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14560) The value of taskmanager.memory.size in flink-conf.yaml is set to zero will cause taskmanager not to work

2019-10-29 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-14560:
---

Assignee: (was: fa zheng)

> The value of taskmanager.memory.size in flink-conf.yaml is set to zero will 
> cause taskmanager not to work 
> --
>
> Key: FLINK-14560
> URL: https://issues.apache.org/jira/browse/FLINK-14560
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.9.0, 1.9.1
>Reporter: fa zheng
>Priority: Minor
> Fix For: 1.10.0, 1.9.2
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> If you accidentally  set taskmanager.memory.size: 0 in flink-conf.yaml, flink 
> should take a fixed ratio with respect to the size of the task manager JVM. 
> The relateted codes are in TaskManagerServicesConfiguration.fromConfiguration
> {code:java}
> //代码占位符
> // extract memory settings
> long configuredMemory;
> String managedMemorySizeDefaultVal = 
> TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
> if 
> (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
>  {
>try {
>   configuredMemory = 
> MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
>  MEGA_BYTES).getMebiBytes();
>} catch (IllegalArgumentException e) {
>   throw new IllegalConfigurationException(
>  "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
>}
> } else {
>configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
> }{code}
> However, in FlinkYarnSessionCli.java, flink will translate the value to byte.
> {code:java}
> //代码占位符
> // JobManager Memory
> final int jobManagerMemoryMB = 
> ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
> // Task Managers memory
> final int taskManagerMemoryMB = 
> ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
> {code}
>  
> As a result, 0 will translate to 0 b and is different from default value.  0 
> b will cause a error in following check code
> {code:java}
> //代码占位符
> checkConfigParameter(
>
> configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
>  ||
>   configuredMemory > 0, configuredMemory,
>TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
>"MemoryManager needs at least one MB of memory. " +
>   "If you leave this config parameter empty, the system automatically " +
>   "pick a fraction of the available memory.");
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340418087
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -281,10 +289,41 @@ protected void processInput(DefaultActionContext 
context) throws Exception {
if (status == InputStatus.END_OF_INPUT) {
context.allActionsCompleted();
}
-   else if (status == InputStatus.NOTHING_AVAILABLE) {
+
+   CompletableFuture jointFuture = 
getInputOutputJointFuture(status);
+   if (jointFuture != null) {
SuspendedMailboxDefaultAction suspendedDefaultAction = 
context.suspendDefaultAction();
-   
inputProcessor.getAvailableFuture().thenRun(suspendedDefaultAction::resume);
+   jointFuture.thenRun(suspendedDefaultAction::resume);
+   }
+   }
+
+   /**
+* @return a combination of input and output futures if at-least one 
future of them is not
+* completed, otherwise return null if all input and outputs are 
available.
+*/
+   private CompletableFuture getInputOutputJointFuture(InputStatus 
status) {
+   if (status == InputStatus.MORE_AVAILABLE && 
isOutputAvailable()) {
+   return null;
+   }
+
+   int length = recordWriters.size();
+   for (int i = 0; i < length; i++) {
+   inputOutputFutures[i] = 
recordWriters.get(i).getAvailableFuture();
+   }
+   inputOutputFutures[length] = 
inputProcessor.getAvailableFuture();
+   return CompletableFuture.allOf(inputOutputFutures);
+   }
+
+   /**
+* @return true if all the record writers are available.
+*/
+   private boolean isOutputAvailable() {
+   for (RecordWriter recordWriter : recordWriters) {
 
 Review comment:
   Yeah, it makes sense to do that, then we can compare the benchmark 
performance between single `RecordWriter` and multiple `RecordWriter`s. Also it 
would thin the `StreamTask` a bit to hide two arrays.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #9974: [FLINK-14501][FLINK-14502] Decouple ClusterDescriptor/ClusterSpecification from CommandLine

2019-10-29 Thread GitBox
TisonKun commented on a change in pull request #9974: 
[FLINK-14501][FLINK-14502] Decouple ClusterDescriptor/ClusterSpecification from 
CommandLine
URL: https://github.com/apache/flink/pull/9974#discussion_r340416816
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
 ##
 @@ -709,22 +730,9 @@ private ApplicationReport startAppMaster(
systemShipFiles.add(file.getAbsoluteFile());
}
 
-   //check if there is a logback or log4j file
-   File logbackFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_LOGBACK_NAME);
-   final boolean hasLogback = logbackFile.exists();
-   if (hasLogback) {
-   systemShipFiles.add(logbackFile);
-   }
-
-   File log4jFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_LOG4J_NAME);
-   final boolean hasLog4j = log4jFile.exists();
-   if (hasLog4j) {
-   systemShipFiles.add(log4jFile);
-   if (hasLogback) {
-   // this means there is already a logback 
configuration file --> fail
-   LOG.warn("The configuration directory ('" + 
configurationDirectory + "') contains both LOG4J and " +
-   "Logback configuration files. 
Please delete or rename one of them.");
-   }
+   final String logConfigFilePath = 
configuration.getString(YarnConfigOptions.APPLICATION_LOG_CONFIG_FILE);
 
 Review comment:
   @GJL do you run in session mode or per-job mode?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340416659
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -281,10 +289,41 @@ protected void processInput(DefaultActionContext 
context) throws Exception {
if (status == InputStatus.END_OF_INPUT) {
context.allActionsCompleted();
}
-   else if (status == InputStatus.NOTHING_AVAILABLE) {
+
+   CompletableFuture jointFuture = 
getInputOutputJointFuture(status);
+   if (jointFuture != null) {
SuspendedMailboxDefaultAction suspendedDefaultAction = 
context.suspendDefaultAction();
-   
inputProcessor.getAvailableFuture().thenRun(suspendedDefaultAction::resume);
+   jointFuture.thenRun(suspendedDefaultAction::resume);
 
 Review comment:
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14375) Refactor task state updating to only notify scheduler about state changes that really happened

2019-10-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14375:

Description: 
The DefaultScheduler triggers failover if a task is notified to be FAILED. 
However, in the case the multiple tasks in the same region fail together, it 
will trigger multiple failovers. The later triggered failovers are useless, 
lead to concurrent failovers and will increase the restart attempts count.

I think the deep reason for this issue is that some fake state changes are 
notified to the DefaultScheduler.
The case above is a FAILED state change from TM will turn a CANCELING vertex to 
CANCELED, and the actual state transition is to CANCELED. But a FAILED state is 
notified to DefaultScheduler.

And there can be another possible issue caused by it, that a FINISHED state 
change is notified from TM when a vertex is CANCELING. The vertex will become 
CANCELED, while its FINISHED state change will be notified to DefaultScheduler 
which may trigger downstream task scheduling.

I'd propose to fix it like this. 
 - The DefaultScheduler does not handle the state update in 
SchedulerBase#updateTaskExecutionState.
 - It only handles state transitions that really happened in ExecutionGraph (on 
Execution#transitionState or vertex reset).

Besides avoiding fake state update, it also has a few other benefits:
1. we can get rid of the hack code in Execution#processFail
2. all state changes will be notified to SchedulingStrategy, i.e. it solves 
FLINK-14233

Here's the POC of this proposal 
https://github.com/zhuzhurk/flink/commits/refactor_state_update.

  was:
The DefaultScheduler triggers failover if a task is notified to be FAILED. 
However, in the case the multiple tasks in the same region fail together, it 
will trigger multiple failovers. The later triggered failovers are useless, 
lead to concurrent failovers and will increase the restart attempts count.

I think the deep reason for this issue is that some fake state changes are 
notified to the DefaultScheduler.
The case above is a FAILED state change from TM will turn a CANCELING vertex to 
CANCELED, and the actual state transition is to CANCELED. But a FAILED state is 
notified to DefaultScheduler.

And there can be another possible issue caused by it, that a FINISHED state 
change is notified from TM when a vertex is CANCELING. The vertex will become 
CANCELED, while its FINISHED state change will be notified to DefaultScheduler 
which may trigger downstream task scheduling.

I'd propose to fix it like this. 
 - The DefaultScheduler does not handle the state update in 
SchedulerBase#updateTaskExecutionState.
 - It only handles state transitions that really happened in ExecutionGraph (on 
Execution#transitionState or vertex reset).

Besides avoiding fake state update, it also has a few other benefits:
1. we can get rid of the hack code in Execution#processFail
2. all states are possible to be notified to SchedulingStrategy, i.e. it solves 
FLINK-14233

Here's the POC of this proposal 
https://github.com/zhuzhurk/flink/commits/refactor_state_update.


> Refactor task state updating to only notify scheduler about state changes 
> that really happened
> --
>
> Key: FLINK-14375
> URL: https://issues.apache.org/jira/browse/FLINK-14375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The DefaultScheduler triggers failover if a task is notified to be FAILED. 
> However, in the case the multiple tasks in the same region fail together, it 
> will trigger multiple failovers. The later triggered failovers are useless, 
> lead to concurrent failovers and will increase the restart attempts count.
> I think the deep reason for this issue is that some fake state changes are 
> notified to the DefaultScheduler.
> The case above is a FAILED state change from TM will turn a CANCELING vertex 
> to CANCELED, and the actual state transition is to CANCELED. But a FAILED 
> state is notified to DefaultScheduler.
> And there can be another possible issue caused by it, that a FINISHED state 
> change is notified from TM when a vertex is CANCELING. The vertex will become 
> CANCELED, while its FINISHED state change will be notified to 
> DefaultScheduler which may trigger downstream task scheduling.
> I'd propose to fix it like this. 
>  - The DefaultScheduler does not handle the state update in 
> SchedulerBase#updateTaskExecutionState.
>  - It only handles state transitions that really happened in ExecutionGraph 
> (on 

[GitHub] [flink] zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340416530
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -281,10 +289,41 @@ protected void processInput(DefaultActionContext 
context) throws Exception {
if (status == InputStatus.END_OF_INPUT) {
context.allActionsCompleted();
}
-   else if (status == InputStatus.NOTHING_AVAILABLE) {
+
+   CompletableFuture jointFuture = 
getInputOutputJointFuture(status);
+   if (jointFuture != null) {
SuspendedMailboxDefaultAction suspendedDefaultAction = 
context.suspendDefaultAction();
-   
inputProcessor.getAvailableFuture().thenRun(suspendedDefaultAction::resume);
+   jointFuture.thenRun(suspendedDefaultAction::resume);
+   }
+   }
+
+   /**
+* @return a combination of input and output futures if at-least one 
future of them is not
+* completed, otherwise return null if all input and outputs are 
available.
+*/
+   private CompletableFuture getInputOutputJointFuture(InputStatus 
status) {
+   if (status == InputStatus.MORE_AVAILABLE && 
isOutputAvailable()) {
+   return null;
+   }
+
+   int length = recordWriters.size();
+   for (int i = 0; i < length; i++) {
+   inputOutputFutures[i] = 
recordWriters.get(i).getAvailableFuture();
+   }
+   inputOutputFutures[length] = 
inputProcessor.getAvailableFuture();
+   return CompletableFuture.allOf(inputOutputFutures);
 
 Review comment:
   I agree it would be a bit clearly to distinguish three cases, and my 
previous way was considering these three cases together via the condition of 
`if (status == InputStatus.MORE_AVAILABLE && isOutputAvailable())` to make it 
shorter.  In other words, as long as at-least one input/output is not 
available, all the futures would be squashed together(including some completed 
future).  
   
   Another concern to do so is that even though we distinguish the input with 
output, but we can not further distinguish among outputs. I mean even though 
only one `RecordWriter` is not available, but we still need to compose all the 
other completed `RecordWriter` futures to avoid constructing the array 
repeatedly.
   
   I would take your way to best-effort exclude non-relevant futures.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-14560) The value of taskmanager.memory.size in flink-conf.yaml is set to zero will cause taskmanager not to work

2019-10-29 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-14560:
---

Assignee: fa zheng

> The value of taskmanager.memory.size in flink-conf.yaml is set to zero will 
> cause taskmanager not to work 
> --
>
> Key: FLINK-14560
> URL: https://issues.apache.org/jira/browse/FLINK-14560
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.9.0, 1.9.1
>Reporter: fa zheng
>Assignee: fa zheng
>Priority: Minor
> Fix For: 1.10.0, 1.9.2
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> If you accidentally  set taskmanager.memory.size: 0 in flink-conf.yaml, flink 
> should take a fixed ratio with respect to the size of the task manager JVM. 
> The relateted codes are in TaskManagerServicesConfiguration.fromConfiguration
> {code:java}
> //代码占位符
> // extract memory settings
> long configuredMemory;
> String managedMemorySizeDefaultVal = 
> TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
> if 
> (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
>  {
>try {
>   configuredMemory = 
> MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
>  MEGA_BYTES).getMebiBytes();
>} catch (IllegalArgumentException e) {
>   throw new IllegalConfigurationException(
>  "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
>}
> } else {
>configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
> }{code}
> However, in FlinkYarnSessionCli.java, flink will translate the value to byte.
> {code:java}
> //代码占位符
> // JobManager Memory
> final int jobManagerMemoryMB = 
> ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
> // Task Managers memory
> final int taskManagerMemoryMB = 
> ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
> {code}
>  
> As a result, 0 will translate to 0 b and is different from default value.  0 
> b will cause a error in following check code
> {code:java}
> //代码占位符
> checkConfigParameter(
>
> configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
>  ||
>   configuredMemory > 0, configuredMemory,
>TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
>"MemoryManager needs at least one MB of memory. " +
>   "If you leave this config parameter empty, the system automatically " +
>   "pick a fraction of the available memory.");
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] TisonKun commented on a change in pull request #9974: [FLINK-14501][FLINK-14502] Decouple ClusterDescriptor/ClusterSpecification from CommandLine

2019-10-29 Thread GitBox
TisonKun commented on a change in pull request #9974: 
[FLINK-14501][FLINK-14502] Decouple ClusterDescriptor/ClusterSpecification from 
CommandLine
URL: https://github.com/apache/flink/pull/9974#discussion_r340416055
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
 ##
 @@ -709,22 +730,9 @@ private ApplicationReport startAppMaster(
systemShipFiles.add(file.getAbsoluteFile());
}
 
-   //check if there is a logback or log4j file
-   File logbackFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_LOGBACK_NAME);
-   final boolean hasLogback = logbackFile.exists();
-   if (hasLogback) {
-   systemShipFiles.add(logbackFile);
-   }
-
-   File log4jFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_LOG4J_NAME);
-   final boolean hasLog4j = log4jFile.exists();
-   if (hasLog4j) {
-   systemShipFiles.add(log4jFile);
-   if (hasLogback) {
-   // this means there is already a logback 
configuration file --> fail
-   LOG.warn("The configuration directory ('" + 
configurationDirectory + "') contains both LOG4J and " +
-   "Logback configuration files. 
Please delete or rename one of them.");
-   }
+   final String logConfigFilePath = 
configuration.getString(YarnConfigOptions.APPLICATION_LOG_CONFIG_FILE);
 
 Review comment:
   Related commit 2eba494890f7aa5056ce6ddf8ddcd804e4ca07e3


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10038: [FLINK-14375][runtime] Refactor task state updating to only notify scheduler about state changes that really happened

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #10038: [FLINK-14375][runtime] Refactor task 
state updating to only notify scheduler about state changes that really happened
URL: https://github.com/apache/flink/pull/10038#issuecomment-547556449
 
 
   
   ## CI report:
   
   * 6f3d22fe8a4c9beb37e72f69d87f670d3dc2dbb5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134056920)
   * a3b19a3ecd42c994e2f4b095d5091971318835ab : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134124269)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14398) Further split input unboxing code into separate methods

2019-10-29 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-14398:

Summary: Further split input unboxing code into separate methods  (was: 
Codegen produced larger-than-64kb method)

> Further split input unboxing code into separate methods
> ---
>
> Key: FLINK-14398
> URL: https://issues.apache.org/jira/browse/FLINK-14398
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner, Table SQL / Planner
>Affects Versions: 1.8.0
>Reporter: Hao Dang
>Assignee: Hao Dang
>Priority: Major
>  Labels: pull-request-available
> Attachments: codegen.example.txt
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> In one of our production pipelines, we have a table with 1200+ columns.  At 
> runtime, it failed due to a method inside the generated code exceeding 64kb 
> when compiled to bytecode.
> After we investigated the generated code, it appeared that the *map* method 
> inside a generated *RichMapFunction* was too long. See attached file 
> (codegen.example.txt).
> In the problematic *map* method, *result setters* were correctly split into 
> individual methods and did not have the largest footprint.
> However, there were also 1000+ input unboxing expressions inside 
> *reusableInputUnboxingExprs*, which, individually were not trivial and when 
> flattened linearly in the *map* function 
> [here|https://github.com/apache/flink/blob/release-1.8.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala#L187],
>  pushed the method size beyond 64kb in bytecode.
> We think it is worthwhile to split these input unboxing code snippets into 
> individual methods.  We were able to verify, in our production environment, 
> that splitting input unboxing code snippets into individual methods resolves 
> the issue.  Would love to hear thoughts from the team and find the best path 
> to fix it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #8194: [FLINK-12216][Runtime]Respect the number of bytes from input parameters in HybridMemorySegment

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #8194: [FLINK-12216][Runtime]Respect the 
number of bytes from input parameters in HybridMemorySegment
URL: https://github.com/apache/flink/pull/8194#issuecomment-545319242
 
 
   
   ## CI report:
   
   * 29a887134fd9564f67c3698d187f36ea9148cf32 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133141926)
   * e96918866fda416260e3291e6719c4fd8420386f : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong merged pull request #10000: [FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods

2019-10-29 Thread GitBox
wuchong merged pull request #1: [FLINK-14398][SQL/Legacy Planner]Further 
split input unboxing code into separate methods
URL: https://github.com/apache/flink/pull/1
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10000: [FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods

2019-10-29 Thread GitBox
wuchong commented on a change in pull request #1: [FLINK-14398][SQL/Legacy 
Planner]Further split input unboxing code into separate methods
URL: https://github.com/apache/flink/pull/1#discussion_r340414436
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ##
 @@ -870,6 +871,45 @@ class SqlITCase extends StreamingWithStateTestBase {
 
 assertEquals(List(expected.toString()), StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testProjectionWithManyColumns(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = StreamTableEnvironment.create(env)
+StreamITCase.clear
+
+// force code split
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
 Review comment:
   Thanks. I think it's fine to keep the config here. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10013: [FLINK-13869][table-planner-blink][hive] Hive functions can not work in blink planner stream mode

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10013: 
[FLINK-13869][table-planner-blink][hive] Hive functions can not work in blink 
planner stream mode
URL: https://github.com/apache/flink/pull/10013#discussion_r340413622
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 ##
 @@ -747,7 +747,7 @@ object AggregateUtil extends Enumeration {
   private[flink] def isTableAggregate(aggCalls: util.List[AggregateCall]): 
Boolean = {
 aggCalls
   .filter(e => e.getAggregation.isInstanceOf[AggSqlFunction])
-  .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].makeFunction(null, null))
+  .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].aggregateFunction)
 
 Review comment:
   Hi @wuchong , the fix just have this line.
   The hive function should been make by correct `constants` and `argTypes`. 
Otherwise it will throw an exception. (See `HiveAggSqlFunction`)
   In this `isTableAggregate`, it just want to check the agg class type, so the 
better way is get the function instead of make a function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on issue #8194: [FLINK-12216][Runtime]Respect the number of bytes from input parameters in HybridMemorySegment

2019-10-29 Thread GitBox
liyafan82 commented on issue #8194: [FLINK-12216][Runtime]Respect the number of 
bytes from input parameters in HybridMemorySegment
URL: https://github.com/apache/flink/pull/8194#issuecomment-547718225
 
 
   > @liyafan82 any thought?
   
   @azagrebin I agree with you. The loop in the else case is actually useless. 
   I have revised the code accordingly. Please take a look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yangjf2019 commented on issue #9764: [FLINK-12939][docs-zh] Translate "Apache Kafka Connector" page into Chinese

2019-10-29 Thread GitBox
yangjf2019 commented on issue #9764: [FLINK-12939][docs-zh] Translate "Apache 
Kafka Connector" page into Chinese
URL: https://github.com/apache/flink/pull/9764#issuecomment-547718034
 
 
   Hi, @wuchong Thanks for your review, and left some comment in your free 
time.I will check it as soon as my possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340411700
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
+   }
+
+   @Override
+   public String toString() {
+   // -MM-dd HH:mm:ss
+   StringBuilder timestampString = new 
StringBuilder(unixTimestampToString(milliseconds));
+   if (precision > 0 && nanoseconds > 0) {
+   // append fraction part
+   
timestampString.append('.').append(nonosSecondsToString(nanoseconds, 
precision));
+   }
+   return timestampString.toString();
+   }
+
+   public long toLong() {
+   return milliseconds;
+   }
+
+   public static PreciseTimestamp fromLong(long milliseconds, int 
precision) {
+   Timestamp t = new Timestamp(milliseconds);
+   long millis = isCompact(precision) ?
+   zeroLastNDigits(t.getTime(), 3 - precision) : 
t.getTime();
+   int nanos = (int) zeroLastNDigits((long) t.getNanos(), 9 - 
precision);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public Timestamp toTimestamp() {
+   return Timestamp.valueOf(this.toString());
+   }
+
+   public static PreciseTimestamp fromTimestamp(Timestamp t, int 
precision) {
+   String timestampString = t.toString();
+   long millis = timestampStringToUnixDate(timestampString);
+   int nanos = timestampStringToNanoseconds(timestampString);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public LocalDateTime toLocalDateTime() {
+   long epochSeconds = (milliseconds >= 0 || nanoseconds == 0) ?
 
 Review comment:
   ```
   public LocalDateTime toLocalDateTime() {
int date = (int) (milliseconds / MILLIS_PER_DAY);
int time = (int) (milliseconds % 

[GitHub] [flink] wangyang0918 commented on a change in pull request #10037: [FLINK-14561] Don't write FLINK_PLUGINS_DIR env variable to Configuration

2019-10-29 Thread GitBox
wangyang0918 commented on a change in pull request #10037: [FLINK-14561] Don't 
write FLINK_PLUGINS_DIR env variable to Configuration
URL: https://github.com/apache/flink/pull/10037#discussion_r340410753
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
 ##
 @@ -2012,6 +2012,9 @@
/** The environment variable name which contains the location of the 
plugins folder. */
public static final String ENV_FLINK_PLUGINS_DIR = "FLINK_PLUGINS_DIR";
 
+   /** The default Flink plugins directory is none has been specified via 
{@link #ENV_FLINK_PLUGINS_DIR}. */
 
 Review comment:
   Is it a typo? is -> if
   /** The default Flink plugins directory if none has been specified via 
{@link #ENV_FLINK_PLUGINS_DIR}. */
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340411700
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
+   }
+
+   @Override
+   public String toString() {
+   // -MM-dd HH:mm:ss
+   StringBuilder timestampString = new 
StringBuilder(unixTimestampToString(milliseconds));
+   if (precision > 0 && nanoseconds > 0) {
+   // append fraction part
+   
timestampString.append('.').append(nonosSecondsToString(nanoseconds, 
precision));
+   }
+   return timestampString.toString();
+   }
+
+   public long toLong() {
+   return milliseconds;
+   }
+
+   public static PreciseTimestamp fromLong(long milliseconds, int 
precision) {
+   Timestamp t = new Timestamp(milliseconds);
+   long millis = isCompact(precision) ?
+   zeroLastNDigits(t.getTime(), 3 - precision) : 
t.getTime();
+   int nanos = (int) zeroLastNDigits((long) t.getNanos(), 9 - 
precision);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public Timestamp toTimestamp() {
+   return Timestamp.valueOf(this.toString());
+   }
+
+   public static PreciseTimestamp fromTimestamp(Timestamp t, int 
precision) {
+   String timestampString = t.toString();
+   long millis = timestampStringToUnixDate(timestampString);
+   int nanos = timestampStringToNanoseconds(timestampString);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public LocalDateTime toLocalDateTime() {
+   long epochSeconds = (milliseconds >= 0 || nanoseconds == 0) ?
 
 Review comment:
   ```
   public LocalDateTime toLocalDateTime() {
int date = (int) (milliseconds / MILLIS_PER_DAY);
int time = (int) (milliseconds % 

[GitHub] [flink] wangyang0918 commented on a change in pull request #10037: [FLINK-14561] Don't write FLINK_PLUGINS_DIR env variable to Configuration

2019-10-29 Thread GitBox
wangyang0918 commented on a change in pull request #10037: [FLINK-14561] Don't 
write FLINK_PLUGINS_DIR env variable to Configuration
URL: https://github.com/apache/flink/pull/10037#discussion_r340411452
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java
 ##
 @@ -55,23 +55,22 @@ private PluginConfig(Optional pluginsPath, String[] 
alwaysParentFirstPatte
 
public static PluginConfig fromConfiguration(Configuration 
configuration) {
return new PluginConfig(
-   getPluginsDirPath(configuration),
+   getPluginsDir().map(File::toPath),

CoreOptions.getParentFirstLoaderPatterns(configuration));
}
 
-   private static Optional getPluginsDirPath(Configuration 
configuration) {
-   String pluginsDir = 
configuration.getString(ConfigConstants.ENV_FLINK_PLUGINS_DIR, null);
-   if (pluginsDir == null) {
-   LOG.info("Environment variable [{}] is not set", 
ConfigConstants.ENV_FLINK_PLUGINS_DIR);
-   return Optional.empty();
-   }
+   public static Optional getPluginsDir() {
+   String pluginsDir = System.getenv().getOrDefault(
+   ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+   ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS);
+
File pluginsDirFile = new File(pluginsDir);
if (!pluginsDirFile.isDirectory()) {
LOG.warn("Environment variable [{}] is set to [{}] but 
the directory doesn't exist",
 
 Review comment:
   If the env is not set and DEFAULT_FLINK_PLUGINS_DIRS does not exist, it will 
show a confusing log. 
   `Environment variable [FLINK_PLUGINS_DIR] is set to [plugins] but the 
directory doesn't exist`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340411900
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 ##
 @@ -187,7 +197,8 @@ private static void 
releasePartitionsQuietly(ResultSubpartition[] partitions, in
numberOfSubpartitions * 
networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
// If the partition type is back pressure-free, we 
register with the buffer pool for
// callbacks to release memory.
-   return 
bufferPoolFactory.createBufferPool(numberOfSubpartitions,
+   return bufferPoolFactory.createBufferPool(
+   numberOfSubpartitions + 1,
 
 Review comment:
   You are right, and my previous thought was also for squashing these two 
commits together finally, because it would cause potential stuck problem if 
only one commit merged. It is only for review purpose to make it separate now.
   I planed to add new tests for cover the changes, and it seems no previous 
tests to verify the minimum size of pool.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340411434
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
+   }
+
+   @Override
+   public String toString() {
+   // -MM-dd HH:mm:ss
+   StringBuilder timestampString = new 
StringBuilder(unixTimestampToString(milliseconds));
+   if (precision > 0 && nanoseconds > 0) {
+   // append fraction part
+   
timestampString.append('.').append(nonosSecondsToString(nanoseconds, 
precision));
+   }
+   return timestampString.toString();
+   }
+
+   public long toLong() {
+   return milliseconds;
+   }
+
+   public static PreciseTimestamp fromLong(long milliseconds, int 
precision) {
+   Timestamp t = new Timestamp(milliseconds);
+   long millis = isCompact(precision) ?
+   zeroLastNDigits(t.getTime(), 3 - precision) : 
t.getTime();
+   int nanos = (int) zeroLastNDigits((long) t.getNanos(), 9 - 
precision);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public Timestamp toTimestamp() {
+   return Timestamp.valueOf(this.toString());
+   }
+
+   public static PreciseTimestamp fromTimestamp(Timestamp t, int 
precision) {
+   String timestampString = t.toString();
+   long millis = timestampStringToUnixDate(timestampString);
+   int nanos = timestampStringToNanoseconds(timestampString);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public LocalDateTime toLocalDateTime() {
+   long epochSeconds = (milliseconds >= 0 || nanoseconds == 0) ?
 
 Review comment:
   Now, nanos has no mills now. It must be 0~99, so it should never be 
negative.
   We don't need to do everything same as `java.sql.Timestamp`, it has 
different usage 

[GitHub] [flink] zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340411311
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/AvailabilityProvider.java
 ##
 @@ -29,10 +29,30 @@
 public interface AvailabilityProvider {
/**
 * Constant that allows to avoid volatile checks {@link 
CompletableFuture#isDone()}. Check
-* {@link #getAvailableFuture()} for more explanation.
+* {@link #isAvailable()} and {@link #isVolatileAvailable()}for more 
explanation.
 */
CompletableFuture AVAILABLE = 
CompletableFuture.completedFuture(null);
 
+   /**
+* Checks whether this instance is available via constant {@link 
#AVAILABLE} to avoid volatile access.
+*
+* @return true if this instance is available for further processing.
+*/
 
 Review comment:
   Exactly I considered doing that before, but I wonder it might bring 
confusing if not clearly describe the scenarios.  I would try to explain it 
more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10038: [FLINK-14375][runtime] Refactor task state updating to only notify scheduler about state changes that really happened

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #10038: [FLINK-14375][runtime] Refactor task 
state updating to only notify scheduler about state changes that really happened
URL: https://github.com/apache/flink/pull/10038#issuecomment-547556449
 
 
   
   ## CI report:
   
   * 6f3d22fe8a4c9beb37e72f69d87f670d3dc2dbb5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134056920)
   * a3b19a3ecd42c994e2f4b095d5091971318835ab : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #10016: [FLINK-14547][table-planner-blink] Fix UDF cannot be in the join condition in blink planner

2019-10-29 Thread GitBox
wuchong commented on issue #10016: [FLINK-14547][table-planner-blink] Fix UDF 
cannot be in the join condition in blink planner
URL: https://github.com/apache/flink/pull/10016#issuecomment-547715692
 
 
   Thanks for the fixing @HuangXingBo .  I agree with @hequn8128 , a unit test 
for plan verifying is enough. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-14547) UDF cannot be in the join condition in blink planner

2019-10-29 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-14547:
---

Assignee: Huang Xingbo

> UDF cannot be in the join condition in blink planner
> 
>
> Key: FLINK-14547
> URL: https://issues.apache.org/jira/browse/FLINK-14547
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.9.1
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, UDF cannot be in the join condition in blink planner, for the 
> following example:
> val util = batchTestUtil()
> val left = util.addTableSource[(Int, Int, String)]("Table3",'a, 'b, 'c)
> val right = util.addTableSource[(Int, Int, Int, String, Long)]("Table5", 'd, 
> 'e, 'f, 'g, 'h)
> util.addFunction("Func", Func0)
> val result = left
>  .leftOuterJoin(right, "a === d && Func(a) === a + d")
>  .select("a, d")
> util.verifyExplain(result)
> The following exception will be thrown:
> java.util.NoSuchElementException: No value 
> presentjava.util.NoSuchElementException: No value present
>  at java.util.Optional.get(Optional.java:135) at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$JoinExpressionVisitor.visit(QueryOperationConverter.java:475)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$JoinExpressionVisitor.visit(QueryOperationConverter.java:463)
>  at 
> org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:121)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$JoinExpressionVisitor.lambda$visit$0(QueryOperationConverter.java:470)
>  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) 
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$JoinExpressionVisitor.visit(QueryOperationConverter.java:472)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$JoinExpressionVisitor.visit(QueryOperationConverter.java:463)
>  at 
> org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:121)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$JoinExpressionVisitor.lambda$visit$0(QueryOperationConverter.java:470)
>  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) 
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$JoinExpressionVisitor.visit(QueryOperationConverter.java:472)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$JoinExpressionVisitor.visit(QueryOperationConverter.java:463)
>  at 
> org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:121)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:228)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:139)
>  at 
> org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:136)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:116)
>  at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:61)
>  at 
> org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:135)
>  at java.util.Collections$SingletonList.forEach(Collections.java:4822) at 
> 

[GitHub] [flink] zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340410253
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/AvailabilityProvider.java
 ##
 @@ -29,10 +29,30 @@
 public interface AvailabilityProvider {
/**
 * Constant that allows to avoid volatile checks {@link 
CompletableFuture#isDone()}. Check
-* {@link #getAvailableFuture()} for more explanation.
+* {@link #isAvailable()} and {@link #isVolatileAvailable()}for more 
explanation.
 */
CompletableFuture AVAILABLE = 
CompletableFuture.completedFuture(null);
 
+   /**
+* Checks whether this instance is available via constant {@link 
#AVAILABLE} to avoid volatile access.
+*
+* @return true if this instance is available for further processing.
+*/
+   default boolean isAvailable() {
 
 Review comment:
   The main motivation of the 1st commit is actually from the 3rd commit. 
   
   But there is another tiny motivation for it : my previous thought of 
`#isAvailable()` should return a boolean value exactly for better 
understanding, just like the way of `NetworkSequenceViewReader#isAvailable()` 
and `ResultSubpartitionView#isAvailable()`. If we do not bring the 3rd commit, 
it also makes sense to do the 1st commit from this point, so I make it separate.
   
   Certainly I also accept to squash these two commit together and it can also 
be interpreted. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-14559) flink sql connector for stream table stdout

2019-10-29 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-14559.
---
Resolution: Duplicate

I think this is a duplicate issue for FLINK-13900

> flink sql connector for stream table stdout
> ---
>
> Key: FLINK-14559
> URL: https://issues.apache.org/jira/browse/FLINK-14559
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Affects Versions: 1.9.1
>Reporter: xiaodao
>Priority: Minor
>  Labels: pull-request-available
>
> in some cases,we need to output table stream resuls into stdout,just for test 
> or verify sql;
>  out message format like:
>  +---+-++--
> |colName1     |colName2|colName3|
> +---+-++--
> |xiaoming123  |xm123      | name123        |
> +---+-++--
> |xiaohong       |xh123        |xh123       |
> +---+-++--



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…

2019-10-29 Thread GitBox
wuchong commented on a change in pull request #10001: 
[FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
URL: https://github.com/apache/flink/pull/10001#discussion_r340409154
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 ##
 @@ -180,34 +180,68 @@ class AggregateITCase(
   @Test
   def testCountDistinct(): Unit = {
 val data = new mutable.MutableList[Row]
-data.+=(Row.of(JInt.valueOf(1), JLong.valueOf(1L), "A"))
-data.+=(Row.of(JInt.valueOf(2), JLong.valueOf(2L), "B"))
-data.+=(Row.of(null, JLong.valueOf(2L), "B"))
-data.+=(Row.of(JInt.valueOf(3), JLong.valueOf(2L), "B"))
-data.+=(Row.of(JInt.valueOf(4), JLong.valueOf(3L), "C"))
-data.+=(Row.of(JInt.valueOf(5), JLong.valueOf(3L), "C"))
-data.+=(Row.of(JInt.valueOf(5), JLong.valueOf(3L), null))
-data.+=(Row.of(JInt.valueOf(6), JLong.valueOf(3L), "C"))
-data.+=(Row.of(JInt.valueOf(7), JLong.valueOf(4L), "B"))
-data.+=(Row.of(JInt.valueOf(8), JLong.valueOf(4L), "A"))
-data.+=(Row.of(JInt.valueOf(9), JLong.valueOf(4L), "D"))
-data.+=(Row.of(null, JLong.valueOf(4L), null))
-data.+=(Row.of(JInt.valueOf(10), JLong.valueOf(4L), "E"))
-data.+=(Row.of(JInt.valueOf(11), JLong.valueOf(5L), "A"))
-data.+=(Row.of(JInt.valueOf(12), JLong.valueOf(5L), "B"))
-
-val rowType: RowTypeInfo = new RowTypeInfo(Types.INT, Types.LONG, 
Types.STRING)
-
-val t = failingDataSource(data)(rowType).toTable(tEnv, 'a, 'b, 'c)
+data.+=(Row.of(localDateTime("1970-01-01 00:00:01"), 
localDate("1970-01-01"),
+  mLocalTime("00:00:01"), BigDecimal(1).bigDecimal, JInt.valueOf(1), 
JLong.valueOf(1L),
+  Long.box(1L), "A"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:02"), 
localDate("1970-01-02"),
+  mLocalTime("00:00:02"), BigDecimal(2).bigDecimal, JInt.valueOf(2), 
JLong.valueOf(2L),
+  Long.box(2L), "B"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:03"), 
localDate("1970-01-03"),
+  mLocalTime("00:00:03"), BigDecimal(3).bigDecimal, null, 
JLong.valueOf(3L),
+  Long.box(2L), "B"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:04"), 
localDate("1970-01-04"),
+  mLocalTime("00:00:04"), BigDecimal(4).bigDecimal, JInt.valueOf(4), 
JLong.valueOf(4L),
+  Long.box(3L), "C"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:05"), 
localDate("1970-01-05"),
+  mLocalTime("00:00:05"), BigDecimal(5).bigDecimal, JInt.valueOf(5), 
JLong.valueOf(5L),
+  Long.box(3L), "C"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:06"), 
localDate("1970-01-06"),
+  mLocalTime("00:00:06"), BigDecimal(6).bigDecimal, JInt.valueOf(6), 
JLong.valueOf(6L),
+  Long.box(3L), "C"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:07"), 
localDate("1970-01-07"),
+  mLocalTime("00:00:07"), BigDecimal(7).bigDecimal, JInt.valueOf(7), 
JLong.valueOf(7L),
+  Long.box(4L), "B"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:08"), 
localDate("1970-01-08"),
+  mLocalTime("00:00:08"), BigDecimal(8).bigDecimal, JInt.valueOf(8), 
JLong.valueOf(8L),
+  Long.box(4L), "A"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:09"), 
localDate("1970-01-09"),
+  mLocalTime("00:00:09"), BigDecimal(9).bigDecimal, JInt.valueOf(9), 
JLong.valueOf(9L),
+  Long.box(4L), "D"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:10"), 
localDate("1970-01-10"),
+  mLocalTime("00:00:10"), BigDecimal(10).bigDecimal, null, 
JLong.valueOf(10L),
+  Long.box(4L), "E"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:11"), 
localDate("1970-01-11"),
+  mLocalTime("00:00:11"), BigDecimal(11).bigDecimal, JInt.valueOf(11), 
JLong.valueOf(11L),
+  Long.box(5L), "A"))
+data.+=(Row.of(localDateTime("1970-01-01 00:00:12"), 
localDate("1970-01-12"),
+  mLocalTime("00:00:12"), BigDecimal(12).bigDecimal, JInt.valueOf(12), 
JLong.valueOf(12L),
+  Long.box(5L), "B"))
 
 Review comment:
   Hi @docete , what do you think about this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-14164) Add a metric to show failover count regarding fine grained recovery

2019-10-29 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16956735#comment-16956735
 ] 

Zhu Zhu edited comment on FLINK-14164 at 10/30/19 2:35 AM:
---

Make this issue a subtask of FLINK-10429 to also add such a metric for NG 
scheduler.

I plan to change the {{SchedulerBase}} to register a meter 'numberOfRestarts' 
to exhibit all restarts.
The meter is a {{MeterView}} and the underlying counter is determined by each 
scheduler implementation:
1. for legacy scheduler, it's the {{ExecutionGraph#numberOfRestartsCounter}} we 
added in FLINK-14206
2. for ng scheduler, it's a new counter added in {{ExecutionFailureHandler}} 
that counts all the task and global failures notified to it (based on 
FLINK-14232 global failure handling).

[~trohrmann] [~gjy] Do you think this would work? If it's Ok, could you assign 
this ticket to me?

This also helps to unblock FLINK-14373 (ZooKeeperHighAvailabilityITCase fails 
on NG scheduling).


was (Author: zhuzh):
Make this issue a subtask of FLINK-10429 to also add such a metric for NG 
scheduler.

I plan to change the {{SchedulerBase}} to register a meter 'numberOfRestarts' 
to exhibit all restarts.
The meter is a {{MeterView}} and the underlying counter is determined by each 
scheduler implementation:
1. for legacy scheduler, it's the {{ExecutionGraph#numberOfRestartsCounter}} we 
added in FLINK-14206
2. for ng scheduler, it's a new counter added in {{ExecutionFailureHandler}} 
that counts all the task and global failures notified to it (based on 
FLINK-14232 global failure handling).

[~trohrmann] [~gjy] Do you think this would work? If it's Ok, could you assign 
this ticket to me?

> Add a metric to show failover count regarding fine grained recovery
> ---
>
> Key: FLINK-14164
> URL: https://issues.apache.org/jira/browse/FLINK-14164
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Metrics
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Previously Flink uses restart all strategy to recover jobs from failures. And 
> the metric "fullRestart" is used to show the count of failovers.
> However, with fine grained recovery introduced in 1.9.0, the "fullRestart" 
> metric only reveals how many times the entire graph has been restarted, not 
> including the number of fine grained failure recoveries.
> As many users want to build their job alerting based on failovers, I'd 
> propose to add such a new metric {{numberOfFailures}}/{{numberOfRestarts}} 
> which also respects fine grained recoveries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14164) Add a metric to show failover count regarding fine grained recovery

2019-10-29 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16956735#comment-16956735
 ] 

Zhu Zhu edited comment on FLINK-14164 at 10/30/19 2:33 AM:
---

Make this issue a subtask of FLINK-10429 to also add such a metric for NG 
scheduler.

I plan to change the {{SchedulerBase}} to register a meter 'numberOfRestarts' 
to exhibit all restarts.
The meter is a {{MeterView}} and the underlying counter is determined by each 
scheduler implementation:
1. for legacy scheduler, it's the {{ExecutionGraph#numberOfRestartsCounter}} we 
added in FLINK-14206
2. for ng scheduler, it's a new counter added in {{ExecutionFailureHandler}} 
that counts all the task and global failures notified to it (based on 
FLINK-14232 global failure handling).

[~trohrmann] [~gjy] Do you think this would work? If it's Ok, could you assign 
this ticket to me?


was (Author: zhuzh):
Make this issue a subtask of FLINK-10429 to also add such a metric for NG 
scheduler.

I plan to change the {{SchedulerBase}} to register a meter 'numberOfRestarts' 
to exhibits all restarts.
The meter is a {{MeterView}} and the underlying counter is determined by each 
scheduler implementation:
1. for legacy scheduler, it's the {{ExecutionGraph#numberOfRestartsCounter}} we 
added in FLINK-10429
2. for ng scheduler, it's a new counter added in {{ExecutionFailureHandler}} 
that counts all the task and global failures notified to it (based on 
FLINK-14232 global failure handling).

[~trohrmann] [~gjy] Do you think this would work? If it's Ok, could you assign 
this ticket to me?

> Add a metric to show failover count regarding fine grained recovery
> ---
>
> Key: FLINK-14164
> URL: https://issues.apache.org/jira/browse/FLINK-14164
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Metrics
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Previously Flink uses restart all strategy to recover jobs from failures. And 
> the metric "fullRestart" is used to show the count of failovers.
> However, with fine grained recovery introduced in 1.9.0, the "fullRestart" 
> metric only reveals how many times the entire graph has been restarted, not 
> including the number of fine grained failure recoveries.
> As many users want to build their job alerting based on failovers, I'd 
> propose to add such a new metric {{numberOfFailures}}/{{numberOfRestarts}} 
> which also respects fine grained recoveries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on issue #10016: [FLINK-14547][table-planner-blink] Fix UDF cannot be in the join condition in blink planner

2019-10-29 Thread GitBox
hequn8128 commented on issue #10016: [FLINK-14547][table-planner-blink] Fix UDF 
cannot be in the join condition in blink planner
URL: https://github.com/apache/flink/pull/10016#issuecomment-547712528
 
 
   @HuangXingBo Thanks a lot for fixing the problem. The fix looks good. 
Besides, it would be better if we change the IT test to UT test. IT test is 
costful. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 removed a comment on issue #10016: [FLINK-14547][table-planner-blink] Fix UDF cannot be in the join condition in blink planner

2019-10-29 Thread GitBox
hequn8128 removed a comment on issue #10016: [FLINK-14547][table-planner-blink] 
Fix UDF cannot be in the join condition in blink planner
URL: https://github.com/apache/flink/pull/10016#issuecomment-547712528
 
 
   @HuangXingBo Thanks a lot for fixing the problem. The fix looks good. 
Besides, it would be better if we change the IT test to UT test. IT test is 
costful. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread GitBox
zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340406957
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 ##
 @@ -35,23 +35,23 @@
  * In this case {@link ResultPartitionWriter#fail(Throwable)} still needs to 
be called afterwards to fully release
  * all resources associated the the partition and propagate failure cause to 
the consumer if possible.
  */
-public interface ResultPartitionWriter extends AutoCloseable {
+public abstract class ResultPartitionWriter implements AutoCloseable, 
AvailabilityProvider {
 
 Review comment:
   The interface can only extend another interface in this case, so we have to 
redefine the duplicate methods `#isAvailable()` and `#getAvailableFuture()` for 
`ResultPartitionWriter`. If we change it to an abstract class, then it can 
implement multiple other interfaces including existing `AvailabilityProvider`. 
I would refactor this commit message a bit to make it more clearly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340406393
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
+   }
+
+   @Override
+   public String toString() {
+   // -MM-dd HH:mm:ss
+   StringBuilder timestampString = new 
StringBuilder(unixTimestampToString(milliseconds));
+   if (precision > 0 && nanoseconds > 0) {
+   // append fraction part
+   
timestampString.append('.').append(nonosSecondsToString(nanoseconds, 
precision));
+   }
+   return timestampString.toString();
+   }
+
+   public long toLong() {
 
 Review comment:
   getMilliseconds


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340406278
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
 
 Review comment:
   I mean just use `==`, instead of compare.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340406101
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
 
 Review comment:
   I know... You can just:
   ```
   int cmp = (milliseconds - otherDate. milliseconds);
   if (cmp == 0) {
   cmp = (nanoseconds - otherDate. nanoseconds);
   }
   return cmp;
   ```
   There will be fewer instructions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340405615
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
 
 Review comment:
   So I think you can just remove mills in nanoseconds, because it is totally 
useless.
   I think we can change name to nanoOfMills, just contain micros and nanos.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340405352
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
 
 Review comment:
   But after we remove precision, it has nothing to do with precision.
   Actually, hive has Timestamp class too, so I think Timestamp is OK.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
JingsongLi commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340405012
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
 
 Review comment:
   There is no need to add this int to Timestamp just for check. Actually, 
Timestamp class can do everything without this field.
   This check can be add an assert to check nanos is 0 when precision less or 
equal than 3.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14382) Incorrect handling of FLINK_PLUGINS_DIR on Yarn

2019-10-29 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962640#comment-16962640
 ] 

Yang Wang commented on FLINK-14382:
---

Hi [~trohrmann],
I agree with you that we should not set the env `FLINK_PLUGIN_DIR` in flink 
configuration. Instead, make the `PluginConfig` to get the plugin directory 
from environment. So we will need to set the `FLINK_PLUGIN_DIR` environment in 
Yarn and mesos submission.

This jira will focus on only ship plugins on Yarn and check the plugin 
mechanism that classes of different plugins are isolated.

> Incorrect handling of FLINK_PLUGINS_DIR on Yarn
> ---
>
> Key: FLINK-14382
> URL: https://issues.apache.org/jira/browse/FLINK-14382
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Blocker
> Fix For: 1.10.0, 1.9.2
>
>
> *(This ticket is a blocker for 1.10 release while critical for 1.9.x)*
> When creating and starting up the yarn containers there are two issues with 
> how the {{FLINK_PLUGINS_DIR}} is being handled.
>  # Content of the {{plugins}} directory is currently added to the class path, 
> braking the encapsulation of the plugins from one another
>  # {{FLINK_PLUGINS_DIR}} is passed to the container as an absolute path as 
> seen by the client. Because of that TaskManager or JobManager can not use it.
> Both bugs are probably contained to {{YarnClusterDescriptor#startAppMaster}} 
> method (which calls relevant {{addEnvironmentFoldersToShipFiles}} and 
> {{uploadAndRegisterFiles}} methods)
> (original description)
> If we do not set FLINK_PLUGINS_DIR in flink-conf.yaml, it will be set to 
> [flink 
> configuration|https://github.com/apache/flink/blob/9e6ff81e22d6f5f04abb50ca1aea84fd2542bf9d/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L158]
>  according to the environment.
> In yarn mode, the local path will be set in flink-conf.yaml and used by 
> jobmanager and taskmanager. We will find the warning log like below.
> {code:java}
> 2019-10-12 19:24:58,165 WARN  org.apache.flink.core.plugin.PluginConfig   
>   - Environment variable [FLINK_PLUGINS_DIR] is set to 
> [/Users/wangy/IdeaProjects/apache-flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/plugins]
>  but the directory doesn't exist
> {code}
> It was in introduced by FLINK-12143.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add the algorithm of Fast Fourier Transformation(FFT)

2019-10-29 Thread GitBox
walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add 
the algorithm of Fast Fourier Transformation(FFT)
URL: https://github.com/apache/flink/pull/9082#discussion_r340402381
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/FFT.java
 ##
 @@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.commons.math3.complex.Complex;
+
+/**
+ * Fast Fourier Transformation(FFT).
+ * Provides 2 algorithms:
+ * 1. Cooley-Tukey algorithm, high performance, but only supports length of 
power-of-2.
+ * 2. Chirp-Z algorithm, can perform FFT with any length.
+ */
+public class FFT {
+
+   private static final double INVERSE_LOG_2 = 1.0 / Math.log(2);
+
+   /**
+* Helper for root of unity. Returns the group for power "length".
+*/
+   public static Complex[] getOmega(int length) {
+   Complex[] omega = new Complex[length];
 
 Review comment:
   oops... I was working on something else and it must've been the context 
switching.. I will resolve this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
docete commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340401995
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
+   }
+
+   @Override
+   public String toString() {
+   // -MM-dd HH:mm:ss
+   StringBuilder timestampString = new 
StringBuilder(unixTimestampToString(milliseconds));
+   if (precision > 0 && nanoseconds > 0) {
+   // append fraction part
+   
timestampString.append('.').append(nonosSecondsToString(nanoseconds, 
precision));
+   }
+   return timestampString.toString();
+   }
+
+   public long toLong() {
+   return milliseconds;
+   }
+
+   public static PreciseTimestamp fromLong(long milliseconds, int 
precision) {
+   Timestamp t = new Timestamp(milliseconds);
+   long millis = isCompact(precision) ?
+   zeroLastNDigits(t.getTime(), 3 - precision) : 
t.getTime();
+   int nanos = (int) zeroLastNDigits((long) t.getNanos(), 9 - 
precision);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public Timestamp toTimestamp() {
+   return Timestamp.valueOf(this.toString());
+   }
+
+   public static PreciseTimestamp fromTimestamp(Timestamp t, int 
precision) {
+   String timestampString = t.toString();
+   long millis = timestampStringToUnixDate(timestampString);
+   int nanos = timestampStringToNanoseconds(timestampString);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public LocalDateTime toLocalDateTime() {
+   long epochSeconds = (milliseconds >= 0 || nanoseconds == 0) ?
+   (milliseconds / 1000) : (milliseconds / 1000 - 1);
+   return LocalDateTime.ofEpochSecond(epochSeconds, nanoseconds, 
ZoneOffset.UTC);
+   }
+
+   public 

[GitHub] [flink] docete commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
docete commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340401570
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
+   }
+
+   @Override
+   public String toString() {
+   // -MM-dd HH:mm:ss
+   StringBuilder timestampString = new 
StringBuilder(unixTimestampToString(milliseconds));
+   if (precision > 0 && nanoseconds > 0) {
+   // append fraction part
+   
timestampString.append('.').append(nonosSecondsToString(nanoseconds, 
precision));
+   }
+   return timestampString.toString();
+   }
+
+   public long toLong() {
+   return milliseconds;
+   }
+
+   public static PreciseTimestamp fromLong(long milliseconds, int 
precision) {
+   Timestamp t = new Timestamp(milliseconds);
+   long millis = isCompact(precision) ?
+   zeroLastNDigits(t.getTime(), 3 - precision) : 
t.getTime();
+   int nanos = (int) zeroLastNDigits((long) t.getNanos(), 9 - 
precision);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public Timestamp toTimestamp() {
+   return Timestamp.valueOf(this.toString());
+   }
+
+   public static PreciseTimestamp fromTimestamp(Timestamp t, int 
precision) {
+   String timestampString = t.toString();
+   long millis = timestampStringToUnixDate(timestampString);
+   int nanos = timestampStringToNanoseconds(timestampString);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public LocalDateTime toLocalDateTime() {
+   long epochSeconds = (milliseconds >= 0 || nanoseconds == 0) ?
+   (milliseconds / 1000) : (milliseconds / 1000 - 1);
+   return LocalDateTime.ofEpochSecond(epochSeconds, nanoseconds, 
ZoneOffset.UTC);
+   }
+
+   public 

[GitHub] [flink] haodang commented on a change in pull request #10000: [FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods

2019-10-29 Thread GitBox
haodang commented on a change in pull request #1: [FLINK-14398][SQL/Legacy 
Planner]Further split input unboxing code into separate methods
URL: https://github.com/apache/flink/pull/1#discussion_r340094625
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ##
 @@ -870,6 +871,45 @@ class SqlITCase extends StreamingWithStateTestBase {
 
 assertEquals(List(expected.toString()), StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testProjectionWithManyColumns(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = StreamTableEnvironment.create(env)
+StreamITCase.clear
+
+// force code split
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
 Review comment:
   makes sense that functionally it will be the same after removing this 
explicit setup.
   
   i think we might want to explicitly set it so that this unit test does not 
depend on the default value of `maxGeneratedCodeLength`, which hypothetically 
can change, similarly to the test `testSelectExpressionWithSplitFromTable()`.  
Plus, we don't have any perf issue adding this.
   
   what do you think?  i'm also okay to remove it if you see that's not an 
issue, but think setting it explicitly is more robust.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] haodang commented on a change in pull request #10000: [FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods

2019-10-29 Thread GitBox
haodang commented on a change in pull request #1: [FLINK-14398][SQL/Legacy 
Planner]Further split input unboxing code into separate methods
URL: https://github.com/apache/flink/pull/1#discussion_r340094625
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ##
 @@ -870,6 +871,45 @@ class SqlITCase extends StreamingWithStateTestBase {
 
 assertEquals(List(expected.toString()), StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testProjectionWithManyColumns(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = StreamTableEnvironment.create(env)
+StreamITCase.clear
+
+// force code split
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
 Review comment:
   makes sense that functionally it will be the same after removing this 
explicit setup.
   
   i think we might want to explicitly set it so that this unit test does not 
depend on the default value of `maxGeneratedCodeLength`, which hypothetically 
can change, similarly to the test `testSelectExpressionWithSplitFromTable()`.  
Plus, we don't have any perf issue adding this.
   
   what do you think?  i'm also okay to remove it if you see that's not an 
issue, but think setting it explicitly is more resilient.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
docete commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340401179
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
+   }
+
+   @Override
+   public String toString() {
+   // -MM-dd HH:mm:ss
+   StringBuilder timestampString = new 
StringBuilder(unixTimestampToString(milliseconds));
+   if (precision > 0 && nanoseconds > 0) {
+   // append fraction part
+   
timestampString.append('.').append(nonosSecondsToString(nanoseconds, 
precision));
+   }
+   return timestampString.toString();
+   }
+
+   public long toLong() {
+   return milliseconds;
+   }
+
+   public static PreciseTimestamp fromLong(long milliseconds, int 
precision) {
+   Timestamp t = new Timestamp(milliseconds);
+   long millis = isCompact(precision) ?
+   zeroLastNDigits(t.getTime(), 3 - precision) : 
t.getTime();
+   int nanos = (int) zeroLastNDigits((long) t.getNanos(), 9 - 
precision);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public Timestamp toTimestamp() {
+   return Timestamp.valueOf(this.toString());
+   }
+
+   public static PreciseTimestamp fromTimestamp(Timestamp t, int 
precision) {
+   String timestampString = t.toString();
+   long millis = timestampStringToUnixDate(timestampString);
+   int nanos = timestampStringToNanoseconds(timestampString);
+   return new PreciseTimestamp(millis, nanos, precision);
+   }
+
+   public LocalDateTime toLocalDateTime() {
+   long epochSeconds = (milliseconds >= 0 || nanoseconds == 0) ?
 
 Review comment:
   the nanoseconds should between 0-999_999_999.
   so given a negative long (-123) as milliseconds, the epoch second would be 
-1 (not 0) and the nanoseconds would be (1000_000_000 - 

[jira] [Assigned] (FLINK-14558) Fix the ClassNotFoundException issue for run python job in standalone mode

2019-10-29 Thread sunjincheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reassigned FLINK-14558:
---

Assignee: Dian Fu

> Fix the ClassNotFoundException issue for run python job in standalone mode
> --
>
> Key: FLINK-14558
> URL: https://issues.apache.org/jira/browse/FLINK-14558
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.10.0
>
>
> java.lang.ClassNotFoundException: 
> org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator 
> will be thrown when running a Python UDF job in a standalone cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14558) Fix the ClassNotFoundException issue for run python job in standalone mode

2019-10-29 Thread sunjincheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962632#comment-16962632
 ] 

sunjincheng commented on FLINK-14558:
-

Thanks for taking this Dian!

> Fix the ClassNotFoundException issue for run python job in standalone mode
> --
>
> Key: FLINK-14558
> URL: https://issues.apache.org/jira/browse/FLINK-14558
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.10.0
>
>
> java.lang.ClassNotFoundException: 
> org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator 
> will be thrown when running a Python UDF job in a standalone cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14557) Clean up the package of py4j

2019-10-29 Thread sunjincheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962631#comment-16962631
 ] 

sunjincheng commented on FLINK-14557:
-

Thanks for taking this Dian!

> Clean up the package of py4j
> 
>
> Key: FLINK-14557
> URL: https://issues.apache.org/jira/browse/FLINK-14557
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently it contains a directory __MACOSX in the Py4j package. It's useless 
> and should be removed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14557) Clean up the package of py4j

2019-10-29 Thread sunjincheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reassigned FLINK-14557:
---

Assignee: Dian Fu

> Clean up the package of py4j
> 
>
> Key: FLINK-14557
> URL: https://issues.apache.org/jira/browse/FLINK-14557
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently it contains a directory __MACOSX in the Py4j package. It's useless 
> and should be removed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14556) Correct the package of cloud pickle

2019-10-29 Thread sunjincheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962630#comment-16962630
 ] 

sunjincheng commented on FLINK-14556:
-

Thanks for taking this Dian!

> Correct the package of cloud pickle
> ---
>
> Key: FLINK-14556
> URL: https://issues.apache.org/jira/browse/FLINK-14556
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently the package structure of cloud pickle is as following:
> {code:java}
> cloudpickle-1.2.2/
> cloudpickle-1.2.2/cloudpickle/
> cloudpickle-1.2.2/cloudpickle/__init__.py 
> cloudpickle-1.2.2/cloudpickle/cloudpickle.py 
> cloudpickle-1.2.2/cloudpickle/cloudpickle_fast.py 
> cloudpickle-1.2.2/LICENSE
> {code}
> It should be:
> {code:java}
> cloudpickle/ 
> cloudpickle/__init__.py  
> cloudpickle/cloudpickle.py  
> cloudpickle/cloudpickle_fast.py  
> cloudpickle/LICENSE
> {code}
> Otherwise, the following error will be thrown when running in a standalone 
> cluster :"ImportError: No module named cloudpickle".
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14556) Correct the package of cloud pickle

2019-10-29 Thread sunjincheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reassigned FLINK-14556:
---

Assignee: Dian Fu

> Correct the package of cloud pickle
> ---
>
> Key: FLINK-14556
> URL: https://issues.apache.org/jira/browse/FLINK-14556
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently the package structure of cloud pickle is as following:
> {code:java}
> cloudpickle-1.2.2/
> cloudpickle-1.2.2/cloudpickle/
> cloudpickle-1.2.2/cloudpickle/__init__.py 
> cloudpickle-1.2.2/cloudpickle/cloudpickle.py 
> cloudpickle-1.2.2/cloudpickle/cloudpickle_fast.py 
> cloudpickle-1.2.2/LICENSE
> {code}
> It should be:
> {code:java}
> cloudpickle/ 
> cloudpickle/__init__.py  
> cloudpickle/cloudpickle.py  
> cloudpickle/cloudpickle_fast.py  
> cloudpickle/LICENSE
> {code}
> Otherwise, the following error will be thrown when running in a standalone 
> cluster :"ImportError: No module named cloudpickle".
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14488) Update python table API with temporary objects methods

2019-10-29 Thread sunjincheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962629#comment-16962629
 ] 

sunjincheng commented on FLINK-14488:
-

Thanks for creating this JIRA. [~dwysakowicz]

PyFlink should align the Table API. I notice that the `CreateTempFunction` is 
part of this FLIP,  I think this issue can be done after adding 
`CreateTempFunction`.

What do you think?

> Update python table API with temporary objects methods
> --
>
> Key: FLINK-14488
> URL: https://issues.apache.org/jira/browse/FLINK-14488
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Table SQL / API
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.10.0
>
>
> Update python table API with new methods introduced in Java/Scala API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] qiuxiafei commented on a change in pull request #9082: [FLINK-13207][ml] Add the algorithm of Fast Fourier Transformation(FFT)

2019-10-29 Thread GitBox
qiuxiafei commented on a change in pull request #9082: [FLINK-13207][ml] Add 
the algorithm of Fast Fourier Transformation(FFT)
URL: https://github.com/apache/flink/pull/9082#discussion_r340400639
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/FFT.java
 ##
 @@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.commons.math3.complex.Complex;
+
+/**
+ * Fast Fourier Transformation(FFT).
+ * Provides 2 algorithms:
+ * 1. Cooley-Tukey algorithm, high performance, but only supports length of 
power-of-2.
+ * 2. Chirp-Z algorithm, can perform FFT with any length.
+ */
+public class FFT {
+
+   private static final double INVERSE_LOG_2 = 1.0 / Math.log(2);
+
+   /**
+* Helper for root of unity. Returns the group for power "length".
+*/
+   public static Complex[] getOmega(int length) {
+   Complex[] omega = new Complex[length];
 
 Review comment:
   I remember there used to be a discussion about breeze months ago. The 
conclusion at that time was NOT to introducing it into flink ml, because it's 
written in scala and most of the time we need to write some kinda wrapper 
classes to use them. Correct me if anything wrong.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
docete commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340400042
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
+   }
+
+   @Override
+   public String toString() {
+   // -MM-dd HH:mm:ss
+   StringBuilder timestampString = new 
StringBuilder(unixTimestampToString(milliseconds));
+   if (precision > 0 && nanoseconds > 0) {
+   // append fraction part
+   
timestampString.append('.').append(nonosSecondsToString(nanoseconds, 
precision));
+   }
+   return timestampString.toString();
+   }
+
+   public long toLong() {
 
 Review comment:
   What about toMilliseconds ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
docete commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340399931
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
+   this.milliseconds = milliseconds;
+   this.nanoseconds = nanoseconds;
+   this.precision = precision;
+   }
+
+   @Override
+   public int compareTo(PreciseTimestamp that) {
+   return this.milliseconds == that.milliseconds ?
+   Integer.compare(this.nanoseconds, that.nanoseconds) :
+   Long.compare(this.milliseconds, that.milliseconds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (!(obj instanceof PreciseTimestamp)) {
+   return false;
+   }
+   PreciseTimestamp that = (PreciseTimestamp) obj;
+   return this.compareTo(that) == 0;
 
 Review comment:
   Do you mean if X *equals* Y, X and Y should be the same object?
   I found equals of Decimal type also depends on compareTo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
docete commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340399119
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
 
 Review comment:
   IMO Timestamp leads misunderstanding with java.sql.Timestamp


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
docete commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340398550
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
 
 Review comment:
   We need this to double check the precision between values and outside type.
   It's very useful to detect outside type deriving bugs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10035: [FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…

2019-10-29 Thread GitBox
docete commented on a change in pull request #10035: 
[FLINK-14080][table-planner-blink] Introduce PreciseTimestamp as inte…
URL: https://github.com/apache/flink/pull/10035#discussion_r340397675
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/PreciseTimestamp.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
+import static 
org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
+
+/**
+ * Immutable SQL TIMESTAMP type with nanosecond precision.
+ */
+public class PreciseTimestamp implements Comparable {
+
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
+
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
+
+   private final long milliseconds;
+
+   private final int nanoseconds;
+
+   private final int precision;
+
+   private PreciseTimestamp(long milliseconds, int nanoseconds, int 
precision) {
+   Preconditions.checkArgument(precision >= MIN_PRECISION && 
precision <= MAX_PRECISION);
+   Preconditions.checkArgument(nanoseconds >= 0 && nanoseconds <= 
999_999_999);
 
 Review comment:
   Performance reason to retain milli part in **milliseconds**:
   We always need the milli part in some operations, we can perform 
"milliseconds without milli part + nanoseconds / 1000_000" once and use it 
everywhere.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14558) Fix the ClassNotFoundException issue for run python job in standalone mode

2019-10-29 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962592#comment-16962592
 ] 

Dian Fu commented on FLINK-14558:
-

[~sunjincheng121] Good catch! I think we should use the ContextClassLoader to 
load PythonScalarFunctionOperator as currently flink-python-*.jar is loaded 
with user classloader. I'd like to take this issue, could you assign it to me? 
Thanks in advance!

> Fix the ClassNotFoundException issue for run python job in standalone mode
> --
>
> Key: FLINK-14558
> URL: https://issues.apache.org/jira/browse/FLINK-14558
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Priority: Major
> Fix For: 1.10.0
>
>
> java.lang.ClassNotFoundException: 
> org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator 
> will be thrown when running a Python UDF job in a standalone cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add the algorithm of Fast Fourier Transformation(FFT)

2019-10-29 Thread GitBox
walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add 
the algorithm of Fast Fourier Transformation(FFT)
URL: https://github.com/apache/flink/pull/9082#discussion_r340395215
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/FFT.java
 ##
 @@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.commons.math3.complex.Complex;
+
+/**
+ * Fast Fourier Transformation(FFT).
+ * Provides 2 algorithms:
+ * 1. Cooley-Tukey algorithm, high performance, but only supports length of 
power-of-2.
+ * 2. Chirp-Z algorithm, can perform FFT with any length.
+ */
+public class FFT {
+
+   private static final double INVERSE_LOG_2 = 1.0 / Math.log(2);
+
+   /**
+* Helper for root of unity. Returns the group for power "length".
+*/
+   public static Complex[] getOmega(int length) {
+   Complex[] omega = new Complex[length];
+   Complex unit = new Complex(Math.cos(2 * Math.PI / length),
+   Math.sin(2 * Math.PI / length));
+   omega[0] = new Complex(1, 0);
+   omega[1] = unit;
+   for (int index = 2; index < length; index++) {
+   omega[index] = omega[index - 1].multiply(unit);
+   }
+   return omega;
+   }
+
+   /**
+* Cooley-Tukey algorithm, can perform fft for any composite base.
+* Specifically, it can perform power-of-2 base fft(with some 
modification to make it an in-place algorithm).
+* See:
+* "An algorithm for the machine calculation of complex Fourier 
series", JW Cooley, JW Tukey, 1965
+* for a rough reference.
+* Detail of radix-2 in-place Cooley-Tukey algorithm can be found in 
many places, e.g. CLRS textbook.
+*/
+   public static Complex[] fftRadix2CooleyTukey(Complex[] input, boolean 
inverse, Complex[] omega) {
+
+   //1. length
+   int length = input.length;
+   int logl = (int) (Math.log(length + 0.01) * INVERSE_LOG_2);
+
+   //notice: only support power of 2
+   //fftChirpZ support other lengths
+   if ((1 << logl) != length) {
+   throw new RuntimeException("Radix-2 Cooley-Tukey only 
supports lengths of power-of-2.");
 
 Review comment:
   throw `IllegalArgumentException` or `UnsupportedOperatorException` makes 
more sense to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add the algorithm of Fast Fourier Transformation(FFT)

2019-10-29 Thread GitBox
walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add 
the algorithm of Fast Fourier Transformation(FFT)
URL: https://github.com/apache/flink/pull/9082#discussion_r340395475
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/FFT.java
 ##
 @@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.commons.math3.complex.Complex;
+
+/**
+ * Fast Fourier Transformation(FFT).
+ * Provides 2 algorithms:
+ * 1. Cooley-Tukey algorithm, high performance, but only supports length of 
power-of-2.
+ * 2. Chirp-Z algorithm, can perform FFT with any length.
+ */
+public class FFT {
+
+   private static final double INVERSE_LOG_2 = 1.0 / Math.log(2);
+
+   /**
+* Helper for root of unity. Returns the group for power "length".
+*/
+   public static Complex[] getOmega(int length) {
+   Complex[] omega = new Complex[length];
+   Complex unit = new Complex(Math.cos(2 * Math.PI / length),
+   Math.sin(2 * Math.PI / length));
+   omega[0] = new Complex(1, 0);
+   omega[1] = unit;
+   for (int index = 2; index < length; index++) {
+   omega[index] = omega[index - 1].multiply(unit);
+   }
+   return omega;
+   }
+
+   /**
+* Cooley-Tukey algorithm, can perform fft for any composite base.
+* Specifically, it can perform power-of-2 base fft(with some 
modification to make it an in-place algorithm).
+* See:
+* "An algorithm for the machine calculation of complex Fourier 
series", JW Cooley, JW Tukey, 1965
+* for a rough reference.
+* Detail of radix-2 in-place Cooley-Tukey algorithm can be found in 
many places, e.g. CLRS textbook.
+*/
+   public static Complex[] fftRadix2CooleyTukey(Complex[] input, boolean 
inverse, Complex[] omega) {
+
+   //1. length
+   int length = input.length;
+   int logl = (int) (Math.log(length + 0.01) * INVERSE_LOG_2);
+
+   //notice: only support power of 2
+   //fftChirpZ support other lengths
+   if ((1 << logl) != length) {
+   throw new RuntimeException("Radix-2 Cooley-Tukey only 
supports lengths of power-of-2.");
+   }
+
+   //2. copy data
+   Complex[] inputCopy = new Complex[length];
 
 Review comment:
   nit: Maybe renamed it to `inPlaceFFT`. inputCopy doesn't sound immediately 
clear what it does especially with such a long method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14557) Clean up the package of py4j

2019-10-29 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962591#comment-16962591
 ] 

Dian Fu commented on FLINK-14557:
-

[~sunjincheng121] Good catch! I'd like to take this issue, could you assign it 
to me? Thanks in advance!

> Clean up the package of py4j
> 
>
> Key: FLINK-14557
> URL: https://issues.apache.org/jira/browse/FLINK-14557
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently it contains a directory __MACOSX in the Py4j package. It's useless 
> and should be removed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14556) Correct the package of cloud pickle

2019-10-29 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962590#comment-16962590
 ] 

Dian Fu commented on FLINK-14556:
-

[~sunjincheng121] Thanks a lot for reporting this issue. Good catch! I'd like 
to take this issue, could you assign it to me? Thanks in advance!

> Correct the package of cloud pickle
> ---
>
> Key: FLINK-14556
> URL: https://issues.apache.org/jira/browse/FLINK-14556
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: sunjincheng
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently the package structure of cloud pickle is as following:
> {code:java}
> cloudpickle-1.2.2/
> cloudpickle-1.2.2/cloudpickle/
> cloudpickle-1.2.2/cloudpickle/__init__.py 
> cloudpickle-1.2.2/cloudpickle/cloudpickle.py 
> cloudpickle-1.2.2/cloudpickle/cloudpickle_fast.py 
> cloudpickle-1.2.2/LICENSE
> {code}
> It should be:
> {code:java}
> cloudpickle/ 
> cloudpickle/__init__.py  
> cloudpickle/cloudpickle.py  
> cloudpickle/cloudpickle_fast.py  
> cloudpickle/LICENSE
> {code}
> Otherwise, the following error will be thrown when running in a standalone 
> cluster :"ImportError: No module named cloudpickle".
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add the algorithm of Fast Fourier Transformation(FFT)

2019-10-29 Thread GitBox
walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add 
the algorithm of Fast Fourier Transformation(FFT)
URL: https://github.com/apache/flink/pull/9082#discussion_r340394137
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/FFT.java
 ##
 @@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.commons.math3.complex.Complex;
+
+/**
+ * Fast Fourier Transformation(FFT).
+ * Provides 2 algorithms:
+ * 1. Cooley-Tukey algorithm, high performance, but only supports length of 
power-of-2.
+ * 2. Chirp-Z algorithm, can perform FFT with any length.
+ */
+public class FFT {
+
+   private static final double INVERSE_LOG_2 = 1.0 / Math.log(2);
+
+   /**
+* Helper for root of unity. Returns the group for power "length".
+*/
+   public static Complex[] getOmega(int length) {
+   Complex[] omega = new Complex[length];
 
 Review comment:
   Can we use [breeze 
Complex](https://github.com/scalanlp/breeze/blob/master/math/src/main/scala/breeze/math/Complex.scala)
 ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add the algorithm of Fast Fourier Transformation(FFT)

2019-10-29 Thread GitBox
walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add 
the algorithm of Fast Fourier Transformation(FFT)
URL: https://github.com/apache/flink/pull/9082#discussion_r340394874
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/FFT.java
 ##
 @@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.commons.math3.complex.Complex;
+
+/**
+ * Fast Fourier Transformation(FFT).
+ * Provides 2 algorithms:
+ * 1. Cooley-Tukey algorithm, high performance, but only supports length of 
power-of-2.
+ * 2. Chirp-Z algorithm, can perform FFT with any length.
+ */
+public class FFT {
+
+   private static final double INVERSE_LOG_2 = 1.0 / Math.log(2);
+
+   /**
+* Helper for root of unity. Returns the group for power "length".
+*/
+   public static Complex[] getOmega(int length) {
+   Complex[] omega = new Complex[length];
+   Complex unit = new Complex(Math.cos(2 * Math.PI / length),
+   Math.sin(2 * Math.PI / length));
+   omega[0] = new Complex(1, 0);
+   omega[1] = unit;
+   for (int index = 2; index < length; index++) {
+   omega[index] = omega[index - 1].multiply(unit);
+   }
+   return omega;
+   }
+
+   /**
+* Cooley-Tukey algorithm, can perform fft for any composite base.
+* Specifically, it can perform power-of-2 base fft(with some 
modification to make it an in-place algorithm).
+* See:
 
 Review comment:
   ```
   /**
* Cooley-Tukey ...
* 
* See reference for more details ...
* 
*   * "An algorithm for..."
*   * "CLRS text book"
*   * 
* 
*/
   ```
   reference should have itemized. and should separate from previous doc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add the algorithm of Fast Fourier Transformation(FFT)

2019-10-29 Thread GitBox
walterddr commented on a change in pull request #9082: [FLINK-13207][ml] Add 
the algorithm of Fast Fourier Transformation(FFT)
URL: https://github.com/apache/flink/pull/9082#discussion_r340395550
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/FFT.java
 ##
 @@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.commons.math3.complex.Complex;
+
+/**
+ * Fast Fourier Transformation(FFT).
+ * Provides 2 algorithms:
+ * 1. Cooley-Tukey algorithm, high performance, but only supports length of 
power-of-2.
+ * 2. Chirp-Z algorithm, can perform FFT with any length.
+ */
+public class FFT {
+
+   private static final double INVERSE_LOG_2 = 1.0 / Math.log(2);
+
+   /**
+* Helper for root of unity. Returns the group for power "length".
+*/
+   public static Complex[] getOmega(int length) {
+   Complex[] omega = new Complex[length];
+   Complex unit = new Complex(Math.cos(2 * Math.PI / length),
+   Math.sin(2 * Math.PI / length));
+   omega[0] = new Complex(1, 0);
+   omega[1] = unit;
+   for (int index = 2; index < length; index++) {
+   omega[index] = omega[index - 1].multiply(unit);
+   }
+   return omega;
+   }
+
+   /**
+* Cooley-Tukey algorithm, can perform fft for any composite base.
+* Specifically, it can perform power-of-2 base fft(with some 
modification to make it an in-place algorithm).
+* See:
+* "An algorithm for the machine calculation of complex Fourier 
series", JW Cooley, JW Tukey, 1965
+* for a rough reference.
+* Detail of radix-2 in-place Cooley-Tukey algorithm can be found in 
many places, e.g. CLRS textbook.
+*/
+   public static Complex[] fftRadix2CooleyTukey(Complex[] input, boolean 
inverse, Complex[] omega) {
+
+   //1. length
+   int length = input.length;
+   int logl = (int) (Math.log(length + 0.01) * INVERSE_LOG_2);
+
+   //notice: only support power of 2
+   //fftChirpZ support other lengths
+   if ((1 << logl) != length) {
+   throw new RuntimeException("Radix-2 Cooley-Tukey only 
supports lengths of power-of-2.");
+   }
+
+   //2. copy data
+   Complex[] inputCopy = new Complex[length];
+   for (int index = 0; index < length; index++) {
+   inputCopy[index] = new Complex(input[index].getReal(), 
input[index].getImaginary());
+   }
+
+   //3. bit reverse
+   int[] reverse = new int[length];
+   for (int index = 0; index < length; index++) {
+   int t = 0;
+   for (int pos = 0; pos < logl; pos++) {
+   if ((index & (1 << pos)) != 0) {
+   t |= (1 << (logl - pos - 1));
+   }
+   }
+   reverse[index] = t;
+   }
+
+   //4. reverse the input
+   for (int index = 0; index < length; index++) {
+   if (index < reverse[index]) {
+   Complex t = inputCopy[index];
+   inputCopy[index] = inputCopy[reverse[index]];
+   inputCopy[reverse[index]] = t;
+   }
+   }
+
+   //5. perform in-place fft
+   if (inverse) {
+   //inverse fft
+   for (int len = 2; len <= length; len *= 2) {
+   int mid = len / 2;
+   for (int step = 0; step < length; step += len) {
+   for (int index = 0; index < mid; 
index++) {
+   Complex t = omega[length / len 
* index]
+   
.multiply(inputCopy[step + mid + 

[GitHub] [flink] flinkbot edited a comment on issue #9988: [FLINK-14418][hive] Create HiveModule to provide Hive built-in functions

2019-10-29 Thread GitBox
flinkbot edited a comment on issue #9988: [FLINK-14418][hive] Create HiveModule 
to provide Hive built-in functions
URL: https://github.com/apache/flink/pull/9988#issuecomment-546044427
 
 
   
   ## CI report:
   
   * 1a5aaeee9aef16c74fc6f2b7e30a07175b4a52a9 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133431616)
   * 79d4d8ab004633d97e0d7d77efc8159a4f19dde5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133440152)
   * ac4bce2f7aeae8c777b9b8d19ef6cd854498dd21 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133456104)
   * e54b182364c4ac57b3e5f654ce2ac509066bed01 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134096720)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13513) Add the Mapper and related classes for later algorithm implementations.

2019-10-29 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962532#comment-16962532
 ] 

Jiangjie Qin commented on FLINK-13513:
--

Checked in to master.

005bda9be361ffe6c371ebefed32e614cbacd876

> Add  the Mapper and related classes  for later algorithm implementations. 
> --
>
> Key: FLINK-13513
> URL: https://issues.apache.org/jira/browse/FLINK-13513
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Machine Learning
>Reporter: Xu Yang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Define the abstract classes for the single-thread executor for Row type data.
>  * Add the definition of abstract class Mapper 
>  * Add abstract class ModelMapper extends Mapper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-13513) Add the Mapper and related classes for later algorithm implementations.

2019-10-29 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-13513.
--
  Assignee: Xu Yang
Resolution: Implemented

> Add  the Mapper and related classes  for later algorithm implementations. 
> --
>
> Key: FLINK-13513
> URL: https://issues.apache.org/jira/browse/FLINK-13513
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Machine Learning
>Reporter: Xu Yang
>Assignee: Xu Yang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Define the abstract classes for the single-thread executor for Row type data.
>  * Add the definition of abstract class Mapper 
>  * Add abstract class ModelMapper extends Mapper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13513) Add the Mapper and related classes for later algorithm implementations.

2019-10-29 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-13513:
-
Fix Version/s: 1.10.0

> Add  the Mapper and related classes  for later algorithm implementations. 
> --
>
> Key: FLINK-13513
> URL: https://issues.apache.org/jira/browse/FLINK-13513
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Machine Learning
>Reporter: Xu Yang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Define the abstract classes for the single-thread executor for Row type data.
>  * Add the definition of abstract class Mapper 
>  * Add abstract class ModelMapper extends Mapper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14220) support drop temp functions

2019-10-29 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-14220.

Resolution: Duplicate

> support drop temp functions
> ---
>
> Key: FLINK-14220
> URL: https://issues.apache.org/jira/browse/FLINK-14220
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.10.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14221) support drop temp system functions and temp catalog functions

2019-10-29 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-14221:
-
Summary: support drop temp system functions and temp catalog functions  
(was: support drop temp system functions)

> support drop temp system functions and temp catalog functions
> -
>
> Key: FLINK-14221
> URL: https://issues.apache.org/jira/browse/FLINK-14221
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.10.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14221) support drop temp system functions

2019-10-29 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-14221:
-
Summary: support drop temp system functions  (was: support drop and alter 
temp system functions)

> support drop temp system functions
> --
>
> Key: FLINK-14221
> URL: https://issues.apache.org/jira/browse/FLINK-14221
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.10.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   >