[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752853592



##
File path: flink-ml-lib/pom.xml
##
@@ -106,6 +112,11 @@ under the License.
   jar
   test
 
+  
+  com.google.code.gson
+  gson
+  2.8.6
+  

Review comment:
   ReadWriteUtils may not satisfy my need, it only a utils to help 
serialize model data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752930662



##
File path: flink-ml-lib/pom.xml
##
@@ -106,6 +112,11 @@ under the License.
   jar
   test
 
+  
+  com.google.code.gson
+  gson
+  2.8.6
+  

Review comment:
   For the json tool ReadWriteUtils supply not support some class's 
serialization. Here, I use the google replace the default json tool. Then, we 
have only one json tool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17831: [FLINK-15825][Table SQL/API] Add renameDatabase() to Catalog

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17831:
URL: https://github.com/apache/flink/pull/17831#issuecomment-973310719


   
   ## CI report:
   
   * a7e983255a689ff59abd268ca0711d43b74a246b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26735)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17834: [FLINK-24941][datadog] Support Boolean gauges

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17834:
URL: https://github.com/apache/flink/pull/17834#issuecomment-973836525


   
   ## CI report:
   
   * da938cbcc8379d3692e66b674db85a790663f8f8 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26742)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17637: [FLINK-24708][planner] Fix wrong results of the IN operator

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17637:
URL: https://github.com/apache/flink/pull/17637#issuecomment-957122792


   
   ## CI report:
   
   * d50d09006bb41b1c8ab87b868aa796637c27cce6 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26733)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #17834: [FLINK-24941][datadog] Support Boolean gauges

2021-11-18 Thread GitBox


flinkbot commented on pull request #17834:
URL: https://github.com/apache/flink/pull/17834#issuecomment-973837022


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit da938cbcc8379d3692e66b674db85a790663f8f8 (Fri Nov 19 
07:48:28 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-24941).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #17834: [FLINK-24941][datadog] Support Boolean gauges

2021-11-18 Thread GitBox


flinkbot commented on pull request #17834:
URL: https://github.com/apache/flink/pull/17834#issuecomment-973836525


   
   ## CI report:
   
   * da938cbcc8379d3692e66b674db85a790663f8f8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-24941) Cannot report backpressure with DatadogReporter

2021-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24941:
---
Labels: pull-request-available  (was: )

> Cannot report backpressure with DatadogReporter
> ---
>
> Key: FLINK-24941
> URL: https://issues.apache.org/jira/browse/FLINK-24941
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Ori Popowski
>Priority: Major
>  Labels: pull-request-available
>
> When using {{DatadogHttpReporter}} the log is full of these errors:
>  
> {code:java}
> 2021-11-16 09:51:11,521 [Flink-MetricRegistry-thread-1] INFO  
> org.apache.flink.metrics.datadog.DatadogHttpReporter  - The metric 
> flink.task.isBackPressured will not be reported because only number types are 
> supported by this reporter. {code}
> The code shows that the reason is that {{isBackPressured}} is a Boolean and 
> all Gauge values are converted to {{Number}} which results in 
> {{ClassCastException}} [1].
> I understand the limitation, but:
>  # This bug can be easily fixed
>  # Monitoring backpressure is extremely important. Without backpressure 
> monitroing there's no way of seeing backpressure history and no alerts.
> h3. Workaround
> For anyone interested, rewrite the 
> {{org.apache.flink.metrics.datadog.DGauge}} to map Booleans to integers (0 => 
> false, 1 => true), and use the maven/sbt shade plugin to take your own 
> version of this class into the final JAR instead the existing class from the 
> flink-metrics-datadog package.
>  
> [1] 
> https://github.com/apache/flink/blob/release-1.11/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java#L184-L188
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] oripwk opened a new pull request #17834: [FLINK-24941][datadog] Support Boolean gauges

2021-11-18 Thread GitBox


oripwk opened a new pull request #17834:
URL: https://github.com/apache/flink/pull/17834


   ## What is the purpose of the change
   
   Supporting Boolean values for Datadog metrics reporter.
   
   
   ## Brief change log
   
 - `DGauge` is now parameterized in type `T` and will map boolean values to 
either `0` or `1` appropriately
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   
 - Add a test case for DGauges that checks both booleans, and Numbers
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not applicable)
   Other reporters (such as Prometheus) know how to deal with Boolean 
values but their documentation does not mention it, so no reason to mention it 
in the Datadog reporter. Seems to be trivial.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dawidwys commented on pull request #17755: [FLINK-24858][core] Prevent version mismatches in TypeSerializers

2021-11-18 Thread GitBox


dawidwys commented on pull request #17755:
URL: https://github.com/apache/flink/pull/17755#issuecomment-973833069


   Could you add an additional note in release notes of 1.14 to upgrade 
directly to 1.14.1 when migrating from 1.13?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-24858) TypeSerializer version mismatch during eagerly restore

2021-11-18 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-24858:
-
Fix Version/s: 1.15.0
   1.14.1
   1.13.4

> TypeSerializer version mismatch during eagerly restore
> --
>
> Key: FLINK-24858
> URL: https://issues.apache.org/jira/browse/FLINK-24858
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.14.0, 1.13.3, 1.15.0
>Reporter: Fabian Paul
>Assignee: Fabian Paul
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> Currently, some of our TypeSerializer snapshots assume information about the 
> binary layout of the actual data rather than only holding information about 
> the TypeSerialzer.
> Multiple users ran into this problem 
> i.e.[https://lists.apache.org/thread/4q5q7wp0br96op6p7f695q2l8lz4wfzx|https://lists.apache.org/thread/4q5q7wp0br96op6p7f695q2l8lz4wfzx]
> {quote}This manifest itself when state is restored egarly (for example an 
> operator state) but, for example a user doesn't register the state on their 
> intializeState/open,* and then a checkpoint happens.
> The result is that we will have elements serialized according to an old 
> binary layout, but our serializer snapshot declares a new version which 
> indicates that the elements are written with a new binary layout.
> The next restore will fail.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] SteNicholas commented on pull request #17833: [FLINK-24785][runtime] Relocate RocksDB's log under flink log directory by default

2021-11-18 Thread GitBox


SteNicholas commented on pull request #17833:
URL: https://github.com/apache/flink/pull/17833#issuecomment-973831379


   @Myasuka , could you please take a look at this pull request? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-23956) KafkaITCase.testOneSourceMultiplePartitions fails due to "The topic metadata failed to propagate to Kafka broker"

2021-11-18 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446332#comment-17446332
 ] 

Yun Gao commented on FLINK-23956:
-

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26724=logs=72d4811f-9f0d-5fd0-014a-0bc26b72b642=c1d93a6a-ba91-515d-3196-2ee8019fbda7=6883]

> KafkaITCase.testOneSourceMultiplePartitions fails due to "The topic metadata 
> failed to propagate to Kafka broker"
> -
>
> Key: FLINK-23956
> URL: https://issues.apache.org/jira/browse/FLINK-23956
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Xintong Song
>Assignee: Fabian Paul
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22741=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6912
> {code}
> Aug 24 13:44:11 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 181.317 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Aug 24 13:44:11 [ERROR] 
> testOneSourceMultiplePartitions(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
>   Time elapsed: 22.794 s  <<< FAILURE!
> Aug 24 13:44:11 java.lang.AssertionError: Create test topic : oneToManyTopic 
> failed, The topic metadata failed to propagate to Kafka broker.
> Aug 24 13:44:11   at org.junit.Assert.fail(Assert.java:88)
> Aug 24 13:44:11   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226)
> Aug 24 13:44:11   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112)
> Aug 24 13:44:11   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212)
> Aug 24 13:44:11   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneSourceMultiplePartitionsExactlyOnceTest(KafkaConsumerTestBase.java:1027)
> Aug 24 13:44:11   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneSourceMultiplePartitions(KafkaITCase.java:100)
> Aug 24 13:44:11   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Aug 24 13:44:11   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Aug 24 13:44:11   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Aug 24 13:44:11   at java.lang.reflect.Method.invoke(Method.java:498)
> Aug 24 13:44:11   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Aug 24 13:44:11   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Aug 24 13:44:11   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Aug 24 13:44:11   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Aug 24 13:44:11   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> Aug 24 13:44:11   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> Aug 24 13:44:11   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Aug 24 13:44:11   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17822:
URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959


   
   ## CI report:
   
   * dfa47afcc103ce68babb7e2967afacfdecb01886 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26729)
 
   * d3df986a75e34e1ed475b2e1236b7770698e7bd1 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26740)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22621) HBaseConnectorITCase.testTableSourceSinkWithDDL unstable on azure

2021-11-18 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446331#comment-17446331
 ] 

Yun Gao commented on FLINK-22621:
-

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26724=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=03dca39c-73e8-5aaf-601d-328ae5c35f20=12619]

> HBaseConnectorITCase.testTableSourceSinkWithDDL unstable on azure
> -
>
> Key: FLINK-22621
> URL: https://issues.apache.org/jira/browse/FLINK-22621
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase, Table SQL / Ecosystem
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Roman Khachatryan
>Assignee: Arvid Heise
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17763=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=03dca39c-73e8-5aaf-601d-328ae5c35f20=12317]
>  
> {code:java}
> 2021-05-10T00:19:41.1703846Z May 10 00:19:41 
> testTableSourceSinkWithDDL[planner = BLINK_PLANNER, legacy = 
> false](org.apache.flink.connector.hbase2.HBaseConnectorITCase)  Time elapsed: 
> 2.907 sec  <<< FAILURE!
> 2021-05-10T00:19:41.1711710Z May 10 00:19:41 java.lang.AssertionError: 
> expected:<[+I[1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 
> 2019-08-18, 19:00, 12345678.0001], +I[2, 20, Hello-2, 200, 2.02, true, 
> Welt-2, 2019-08-18T19:01, 2019-08-18, 19:01, 12345678.0002], +I[3, 30, 
> Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 
> 12345678.0003], +I[4, 40, null, 400, 4.04, true, Welt-4, 2019-08-18T19:03, 
> 2019-08-18, 19:03, 12345678.0004], +I[5, 50, Hello-5, 500, 5.05, false, 
> Welt-5, 2019-08-19T19:10, 2019-08-19, 19:10, 12345678.0005], +I[6, 60, 
> Hello-6, 600, 6.06, true, Welt-6, 2019-08-19T19:20, 2019-08-19, 19:20, 
> 12345678.0006], +I[7, 70, Hello-7, 700, 7.07, false, Welt-7, 
> 2019-08-19T19:30, 201 9-08-19, 19:30, 12345678.0007], +I[8, 80, null, 800, 
> 8.08, true, Welt-8, 2019-08-19T19:40, 2019-08-19, 19:40, 12345678.0008]]> but 
> was:<[+I[1, 10,  Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 
> 2019-08-18, 19:00, 12345678.0001], +I[2, 20, Hello-2, 200, 2.02, true, 
> Welt-2, 2019-08-18T19 :01, 2019-08-18, 19:01, 12345678.0002], +I[3, 30, 
> Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 
> 12345678.0003]]>
> 2021-05-10T00:19:41.1716769Z May 10 00:19:41at 
> org.junit.Assert.fail(Assert.java:88)
> 2021-05-10T00:19:41.1717997Z May 10 00:19:41at 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2021-05-10T00:19:41.1718744Z May 10 00:19:41at 
> org.junit.Assert.assertEquals(Assert.java:118)
> 2021-05-10T00:19:41.1719472Z May 10 00:19:41at 
> org.junit.Assert.assertEquals(Assert.java:144)
> 2021-05-10T00:19:41.1720270Z May 10 00:19:41at 
> org.apache.flink.connector.hbase2.HBaseConnectorITCase.testTableSourceSinkWithDDL(HBaseConnecto
>  rITCase.java:506)
>  {code}
> Probably the same or similar to FLINK-19615



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot commented on pull request #17833: [FLINK-24785][runtime] Relocate RocksDB's log under flink log directory by default

2021-11-18 Thread GitBox


flinkbot commented on pull request #17833:
URL: https://github.com/apache/flink/pull/17833#issuecomment-973809509


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 4be3984d373b3cf1bbb6573f9d73e01c02f306ab (Fri Nov 19 
07:10:37 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-24785).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17833: [FLINK-24785][runtime] Relocate RocksDB's log under flink log directory by default

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17833:
URL: https://github.com/apache/flink/pull/17833#issuecomment-973807642


   
   ## CI report:
   
   * 4be3984d373b3cf1bbb6573f9d73e01c02f306ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26741)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17653: FLINK SQL checkpoint不生效

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17653:
URL: https://github.com/apache/flink/pull/17653#issuecomment-958686553


   
   ## CI report:
   
   * a3a776d1264735d38b9f5d979a617d65e5c5069a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26728)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #17833: [FLINK-24785][runtime] Relocate RocksDB's log under flink log directory by default

2021-11-18 Thread GitBox


flinkbot commented on pull request #17833:
URL: https://github.com/apache/flink/pull/17833#issuecomment-973807642


   
   ## CI report:
   
   * 4be3984d373b3cf1bbb6573f9d73e01c02f306ab UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-24785) Relocate RocksDB's log under flink log directory by default

2021-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24785:
---
Labels: pull-request-available  (was: )

> Relocate RocksDB's log under flink log directory by default
> ---
>
> Key: FLINK-24785
> URL: https://issues.apache.org/jira/browse/FLINK-24785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Previously, RocksDB's log locates at its own DB folder, which makes the 
> debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log 
> directory by default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] SteNicholas opened a new pull request #17833: [FLINK-24785][runtime] Relocate RocksDB's log under flink log directory by default

2021-11-18 Thread GitBox


SteNicholas opened a new pull request #17833:
URL: https://github.com/apache/flink/pull/17833


   ## What is the purpose of the change
   
   *Previously, RocksDB's log locates at its own DB folder, which makes the 
debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log 
directory by default. The default `log_dir` of the `DBOptions` should be the 
Flink log directory path.*
   
   ## Brief change log
   
 - *Relocates the RocksDB's log directory option of the `DBOptions` to the 
Flink log directory in `RocksDBResourceContainer`.*
   
   ## Verifying this change
   
 - *Adds the test `testRelocateDBOptionsLogDir` in 
`RocksDBResourceContainerTest` to verify whether the log directory of the 
RocksDB reloates to the Flink log directory by default.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24935) Python module failed to compile due to "Could not create local repository"

2021-11-18 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446321#comment-17446321
 ] 

Yun Gao commented on FLINK-24935:
-

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26723=logs=a29bcfe1-064d-50b9-354f-07802213a3c0=47ff6576-c9dc-5eab-9db8-183dcca3bede=28]

> Python module failed to compile due to "Could not create local repository"
> --
>
> Key: FLINK-24935
> URL: https://issues.apache.org/jira/browse/FLINK-24935
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.12.5
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> Invoking mvn with 'mvn -Dmaven.wagon.http.pool=false --settings 
> /__w/1/s/tools/ci/google-mirror-settings.xml 
> -Dorg.slf4j.simpleLogger.showDateTime=true 
> -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS 
> -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
>  --no-snapshot-updates -B -Dhadoop.version=2.8.3 -Dinclude_hadoop_aws 
> -Dscala-2.11  clean deploy 
> -DaltDeploymentRepository=validation_repository::default::file:/tmp/flink-validation-deployment
>  -Dmaven.repo.local=/home/vsts/work/1/.m2/repository 
> -Dflink.convergence.phase=install -Pcheck-convergence -Dflink.forkCount=2 
> -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -U -DskipTests'
> [ERROR] Could not create local repository at /home/vsts/work/1/.m2/repository 
> -> [Help 1]
> [ERROR] 
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR] 
> [ERROR] For more information about the errors and possible solutions, please 
> read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/LocalRepositoryNotAccessibleException
>  {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26625=logs=a29bcfe1-064d-50b9-354f-07802213a3c0=47ff6576-c9dc-5eab-9db8-183dcca3bede]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557- add knn algorithm to flink-ml

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752903128



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/distance/EuclideanDistance.java
##
@@ -0,0 +1,259 @@
+package org.apache.flink.ml.classification.knn.distance;
+
+import org.apache.flink.ml.common.linalg.BLAS;
+import org.apache.flink.ml.common.linalg.DenseMatrix;
+import org.apache.flink.ml.common.linalg.DenseVector;
+import org.apache.flink.ml.common.linalg.MatVecOp;
+import org.apache.flink.ml.common.linalg.SparseVector;
+import org.apache.flink.ml.common.linalg.Vector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+
+/**
+ * Euclidean distance is the "ordinary" straight-line distance between two 
points in Euclidean
+ * space.
+ *
+ * https://en.wikipedia.org/wiki/Euclidean_distance
+ *
+ * Given two vectors a and b, Euclidean Distance = ||a - b||, where ||*|| 
means the L2 norm of
+ * the vector.
+ */
+public class EuclideanDistance extends BaseFastDistance {

Review comment:
   yes
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24763) ParquetFileSystemITCase.testLimitableBulkFormat failed on Azure

2021-11-18 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446320#comment-17446320
 ] 

Yun Gao commented on FLINK-24763:
-

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26722=logs=ba53eb01-1462-56a3-8e98-0dd97fbcaab5=2e426bf0-b717-56bb-ab62-d63086457354=13523]

> ParquetFileSystemITCase.testLimitableBulkFormat failed on Azure
> ---
>
> Key: FLINK-24763
> URL: https://issues.apache.org/jira/browse/FLINK-24763
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, 
> ORC, SequenceFile)
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Assignee: Jingsong Lee
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0
>
>
> The test {{ParquetFileSystemITCase.testLimitableBulkFormat}} fails with 
> {code}
> 2021-11-03T22:10:11.5106075Z Nov 03 22:10:11 [ERROR] 
> testLimitableBulkFormat[false]  Time elapsed: 9.177 s  <<< ERROR!
> 2021-11-03T22:10:11.5106643Z Nov 03 22:10:11 java.lang.RuntimeException: 
> Failed to fetch next result
> 2021-11-03T22:10:11.5107213Z Nov 03 22:10:11  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> 2021-11-03T22:10:11.5111034Z Nov 03 22:10:11  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-11-03T22:10:11.5112190Z Nov 03 22:10:11  at 
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:188)
> 2021-11-03T22:10:11.5112892Z Nov 03 22:10:11  at 
> java.util.Iterator.forEachRemaining(Iterator.java:115)
> 2021-11-03T22:10:11.5113393Z Nov 03 22:10:11  at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
> 2021-11-03T22:10:11.5114157Z Nov 03 22:10:11  at 
> org.apache.flink.formats.parquet.ParquetFileSystemITCase.testLimitableBulkFormat(ParquetFileSystemITCase.java:128)
> 2021-11-03T22:10:11.5114951Z Nov 03 22:10:11  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-11-03T22:10:11.5115568Z Nov 03 22:10:11  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-11-03T22:10:11.5116115Z Nov 03 22:10:11  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-11-03T22:10:11.5116591Z Nov 03 22:10:11  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-11-03T22:10:11.5117088Z Nov 03 22:10:11  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-11-03T22:10:11.5117807Z Nov 03 22:10:11  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-11-03T22:10:11.5118821Z Nov 03 22:10:11  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-11-03T22:10:11.5119417Z Nov 03 22:10:11  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-11-03T22:10:11.5119944Z Nov 03 22:10:11  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-11-03T22:10:11.5120427Z Nov 03 22:10:11  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-11-03T22:10:11.5120919Z Nov 03 22:10:11  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2021-11-03T22:10:11.5121571Z Nov 03 22:10:11  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-11-03T22:10:11.5122526Z Nov 03 22:10:11  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2021-11-03T22:10:11.5123245Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2021-11-03T22:10:11.5123804Z Nov 03 22:10:11  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2021-11-03T22:10:11.5124314Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2021-11-03T22:10:11.5124806Z Nov 03 22:10:11  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2021-11-03T22:10:11.5125313Z Nov 03 22:10:11  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2021-11-03T22:10:11.5125810Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2021-11-03T22:10:11.5126281Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2021-11-03T22:10:11.5126739Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2021-11-03T22:10:11.5127349Z Nov 03 22:10:11  at 
> 

[jira] [Commented] (FLINK-24960) YARNSessionCapacitySchedulerITCase.testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots ha

2021-11-18 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446318#comment-17446318
 ] 

Yun Gao commented on FLINK-24960:
-

Hi [~chesnay]  could you help to have a look at this issue~?

> YARNSessionCapacitySchedulerITCase.testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
>  hangs on azure
> ---
>
> Key: FLINK-24960
> URL: https://issues.apache.org/jira/browse/FLINK-24960
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> Nov 18 22:37:08 
> 
> Nov 18 22:37:08 Test 
> testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>  is running.
> Nov 18 22:37:08 
> 
> Nov 18 22:37:25 22:37:25,470 [main] INFO  
> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase [] - Extracted 
> hostname:port: 5718b812c7ab:38622
> Nov 18 22:52:36 
> ==
> Nov 18 22:52:36 Process produced no output for 900 seconds.
> Nov 18 22:52:36 
> ==
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26722=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=36395



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24960) YARNSessionCapacitySchedulerITCase.testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots hang

2021-11-18 Thread Yun Gao (Jira)
Yun Gao created FLINK-24960:
---

 Summary: 
YARNSessionCapacitySchedulerITCase.testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
 hangs on azure
 Key: FLINK-24960
 URL: https://issues.apache.org/jira/browse/FLINK-24960
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.15.0
Reporter: Yun Gao


{code:java}
Nov 18 22:37:08 

Nov 18 22:37:08 Test 
testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
 is running.
Nov 18 22:37:08 

Nov 18 22:37:25 22:37:25,470 [main] INFO  
org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase [] - Extracted 
hostname:port: 5718b812c7ab:38622
Nov 18 22:52:36 
==
Nov 18 22:52:36 Process produced no output for 900 seconds.
Nov 18 22:52:36 
==
 {code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26722=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=36395



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] Aitozi commented on pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-18 Thread GitBox


Aitozi commented on pull request #17554:
URL: https://github.com/apache/flink/pull/17554#issuecomment-973793867


   @wangyang0918 Thanks for your comments, I will address it this weekend .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-24959) Add a BitMap function to FlinkSQL

2021-11-18 Thread ZhuoYu Chen (Jira)
ZhuoYu Chen created FLINK-24959:
---

 Summary: Add a BitMap function to FlinkSQL
 Key: FLINK-24959
 URL: https://issues.apache.org/jira/browse/FLINK-24959
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.15.0
Reporter: ZhuoYu Chen


bitmap_and :{color:#33}Computes the intersection of two input bitmaps and 
returns the new bitmap{color}

{color:#30323e}bitmap_andnot:{color:#33}Computes the set (difference set) 
that is in A but not in B.{color}{color}

{color:#30323e}{color:#33}Bitmap functions related to join operations, 
etc{color}{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752892098



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java
##
@@ -0,0 +1,223 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.castTo;
+import static org.apache.flink.ml.classification.knn.KnnUtil.serializeResult;
+import static org.apache.flink.ml.classification.knn.KnnUtil.updateQueue;
+
+/** knn model data, which will be used to calculate the distances between 
nodes. */
+public class KnnModelData implements Serializable, Cloneable {
+private static final long serialVersionUID = -2940551481683238630L;
+private final List dictData;
+private final EuclideanDistance fastDistance;
+protected Comparator> comparator;
+private DataType idType;
+
+/**
+ * constructor.
+ *
+ * @param list BaseFastDistanceData list.
+ * @param fastDistance used to accelerate the speed of calculating 
distance.
+ */
+public KnnModelData(List list, EuclideanDistance 
fastDistance) {
+this.dictData = list;
+this.fastDistance = fastDistance;
+comparator = Comparator.comparingDouble(o -> -o.f0);
+}
+
+/**
+ * set id type.
+ *
+ * @param idType id type.
+ */
+public void setIdType(DataType idType) {
+this.idType = idType;
+}
+
+/**
+ * find the nearest topN neighbors from whole nodes.
+ *
+ * @param input input node.
+ * @param topN top N.
+ * @param radius the parameter to describe the range to find neighbors.
+ * @return
+ */
+public String findNeighbor(Object input, Integer topN, Double radius) {

Review comment:
   done, thanks for your suggestion.

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,343 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import 

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752891995



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/DenseMatrix.java
##
@@ -0,0 +1,161 @@
+package org.apache.flink.ml.classification.knn;
+
+import java.io.Serializable;
+
+/**
+ * Knn DenseMatrix stores dense matrix data and provides some methods to 
operate on the matrix it
+ * represents. This data structure helps knn to accelerate distance 
calculation.
+ */
+public class DenseMatrix implements Serializable {

Review comment:
   done, thanks for your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752892347



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/Knn.java
##
@@ -0,0 +1,220 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.MapPartitionFunctionWrapper;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.findColIndex;
+import static org.apache.flink.ml.classification.knn.KnnUtil.findColIndices;
+import static org.apache.flink.ml.classification.knn.KnnUtil.merge;
+import static org.apache.flink.ml.classification.knn.KnnUtil.pGson;
+
+/**
+ * KNN classifier is to classify unlabeled observations by assigning them to 
the class of the most

Review comment:
   done, thanks for your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17822:
URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959


   
   ## CI report:
   
   * 9d114b08b78d74c4d9a198c09bd8ad7c99eeaf0e Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26693)
 
   * dfa47afcc103ce68babb7e2967afacfdecb01886 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26729)
 
   * d3df986a75e34e1ed475b2e1236b7770698e7bd1 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26740)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752889883



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,343 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.extractObject;
+import static org.apache.flink.ml.classification.knn.KnnUtil.findColTypes;
+import static org.apache.flink.ml.classification.knn.KnnUtil.pGson;
+import static 
org.apache.flink.ml.classification.knn.KnnUtil.resolvedSchema2Schema;
+
+/** Knn classification model fitted by KnnClassifier. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+public static final String BROADCAST_STR = "broadcastModelKey";
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+private Table[] modelData;
+
+/** constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public KnnModel(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * Set model data for knn prediction.
+ *
+ * @param modelData knn model.
+ * @return knn classification model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * get model data.
+ *
+ * @return list of tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * @param inputs a list of tables.
+ * @return result.
+ */
+@Override
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+
+Map> broadcastMap = new HashMap<>(1);
+broadcastMap.put(BROADCAST_STR, model);
+ResolvedSchema modelSchema = modelData[0].getResolvedSchema();
+DataType idType =
+
modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1);
+
+ResolvedSchema outputSchema =
+getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), 
idType);
+
+DataType[] dataTypes = outputSchema.getColumnDataTypes().toArray(new 
DataType[0]);
+

[GitHub] [flink-ml] HuangXingBo commented on pull request #36: [FLINK-24933][ML] Support ML Python API to implement FLIP-173 and FLP-174

2021-11-18 Thread GitBox


HuangXingBo commented on pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#issuecomment-973787987


   @lindong28 Thanks a lot for the review. I have addressed the comments at the 
latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17822:
URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959


   
   ## CI report:
   
   * 9d114b08b78d74c4d9a198c09bd8ad7c99eeaf0e Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26693)
 
   * dfa47afcc103ce68babb7e2967afacfdecb01886 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26729)
 
   * d3df986a75e34e1ed475b2e1236b7770698e7bd1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-24835) "group by" in the interval join will throw a exception

2021-11-18 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-24835:
---
Fix Version/s: (was: 1.13.4)

> "group by" in the interval join will throw a exception
> --
>
> Key: FLINK-24835
> URL: https://issues.apache.org/jira/browse/FLINK-24835
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: Jing Zhang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Can reproduce this bug by the following code added into 
> IntervalJoinTest.scala:
> {code:java}
> @Test
> def testSemiIntervalJoinWithSimpleConditionAndGroup(): Unit = {
>   val sql =
> """
>   |SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
>   | SELECT t2.a FROM MyTable2 t2
>   |   WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime 
> + INTERVAL '5' MINUTE
>   |   GROUP BY t2.a
>   |)
> """.stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The exception is :
> {code:java}
> java.lang.IllegalStateException
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
>     at 
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalJoinRule.matches(StreamPhysicalJoinRule.scala:64)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:64)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at 

[GitHub] [flink-ml] HuangXingBo commented on a change in pull request #36: [FLINK-24933][ML] Support ML Python API

2021-11-18 Thread GitBox


HuangXingBo commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r752814620



##
File path: flink-ml-python/apache_flink_ml/mllib/__init__.py
##
@@ -0,0 +1,17 @@
+

Review comment:
   I originally thought that `ml` corresponds to the `flink-ml` module, and 
`mllib` corresponds to `flink-ml-lib` which contains various algorithm 
implementations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17831: [FLINK-15825][Table SQL/API] Add renameDatabase() to Catalog

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17831:
URL: https://github.com/apache/flink/pull/17831#issuecomment-973310719


   
   ## CI report:
   
   * cf69d827d81b781bc21b2faf0badb552aa884415 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26726)
 
   * a7e983255a689ff59abd268ca0711d43b74a246b Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26735)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24495) Python installdeps hangs

2021-11-18 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446298#comment-17446298
 ] 

Yun Gao commented on FLINK-24495:
-

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26722=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=23596]

> Python installdeps hangs
> 
>
> Key: FLINK-24495
> URL: https://issues.apache.org/jira/browse/FLINK-24495
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: Xintong Song
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.15.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24922=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=23587
> {code}
> Oct 10 02:30:01 py38-cython create: /__w/1/s/flink-python/.tox/py38-cython
> Oct 10 02:30:04 py38-cython installdeps: pytest, apache-beam==2.27.0, 
> cython==0.29.16, grpcio>=1.29.0,<2, grpcio-tools>=1.3.5,<=1.14.2, 
> apache-flink-libraries
> Oct 10 02:45:22 
> ==
> Oct 10 02:45:22 Process produced no output for 900 seconds.
> Oct 10 02:45:22 
> ==
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17832: [FLINK-24958][docs]correct the example and link for temporal table fu…

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17832:
URL: https://github.com/apache/flink/pull/17832#issuecomment-973714890


   
   ## CI report:
   
   * 5b2208841608db9795796828f11c82060392a777 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26736)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24216) org.apache.flink.test.scheduling.AdaptiveSchedulerITCase#testStopWithSavepointFailOnFirstSavepointSucceedOnSecond blocked on azure

2021-11-18 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446296#comment-17446296
 ] 

Yun Gao commented on FLINK-24216:
-

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26692=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=12879]

> org.apache.flink.test.scheduling.AdaptiveSchedulerITCase#testStopWithSavepointFailOnFirstSavepointSucceedOnSecond
>  blocked on azure
> --
>
> Key: FLINK-24216
> URL: https://issues.apache.org/jira/browse/FLINK-24216
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23735=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=12914
> {code:java}
> Sep 08 06:46:01 "main" #1 prio=5 os_prio=0 tid=0x7f23c400b800 nid=0x498c 
> waiting on condition [0x7f23cc276000]
> Sep 08 06:46:01java.lang.Thread.State: WAITING (parking)
> Sep 08 06:46:01   at sun.misc.Unsafe.park(Native Method)
> Sep 08 06:46:01   - parking to wait for  <0x86d021a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> Sep 08 06:46:01   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> Sep 08 06:46:01   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> Sep 08 06:46:01   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> Sep 08 06:46:01   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> Sep 08 06:46:01   at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> Sep 08 06:46:01   at 
> org.apache.flink.test.scheduling.AdaptiveSchedulerITCase$DummySource.awaitRunning(AdaptiveSchedulerITCase.java:272)
> Sep 08 06:46:01   at 
> org.apache.flink.test.scheduling.AdaptiveSchedulerITCase$DummySource.access$100(AdaptiveSchedulerITCase.java:255)
> Sep 08 06:46:01   at 
> org.apache.flink.test.scheduling.AdaptiveSchedulerITCase.testStopWithSavepointFailOnFirstSavepointSucceedOnSecond(AdaptiveSchedulerITCase.java:219)
> Sep 08 06:46:01   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Sep 08 06:46:01   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Sep 08 06:46:01   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Sep 08 06:46:01   at java.lang.reflect.Method.invoke(Method.java:498)
> Sep 08 06:46:01   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Sep 08 06:46:01   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Sep 08 06:46:01   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Sep 08 06:46:01   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Sep 08 06:46:01   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Sep 08 06:46:01   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Sep 08 06:46:01   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> Sep 08 06:46:01   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Sep 08 06:46:01   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] lindong28 commented on a change in pull request #36: [FLINK-24933][ML] Support ML Python API

2021-11-18 Thread GitBox


lindong28 commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r752877803



##
File path: flink-ml-python/apache_flink_ml/ml/api/core.py
##
@@ -0,0 +1,221 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from abc import ABC, abstractmethod
+from typing import TypeVar, Generic, List
+
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import Table
+
+from apache_flink_ml.ml.param.param import WithParams
+
+T = TypeVar('T')
+E = TypeVar('E')
+M = TypeVar('M')
+
+
+class Stage(WithParams[T], ABC):
+"""
+Base class for a node in a Pipeline or Graph. The interface is only a 
concept, and does not have
+any actual functionality. Its subclasses could be Estimator, Model, 
Transformer or AlgoOperator.
+No other classes should inherit this interface directly.
+
+Each stage is with parameters, and requires a public empty constructor for 
restoration.
+"""
+
+@abstractmethod
+def save(self, path: str) -> None:
+"""
+Saves this stage to the given path.
+"""
+pass
+
+@classmethod
+@abstractmethod
+def load(cls, env: StreamExecutionEnvironment, path: str) -> T:
+"""
+Instantiates a new stage instance based on the data read from the 
given path.
+"""
+pass
+
+
+class AlgoOperator(Stage[T], ABC):
+"""
+An AlgoOperator takes a list of tables as inputs and produces a list of 
tables as results. It
+can be used to encode generic multi-input multi-output computation logic.
+"""
+
+@abstractmethod
+def transform(self, *inputs: Table) -> List[Table]:
+"""
+Applies the AlgoOperator on the given input tables and returns the 
result tables.
+
+:param inputs: A list of tables.
+:return: A list of tables.
+"""
+pass
+
+
+class Transformer(AlgoOperator[T], ABC):
+"""
+A Transformer is an AlgoOperator with the semantic difference that it 
encodes the Transformation
+logic, such that a record in the output typically corresponds to one 
record in the input. In
+contrast, an AlgoOperator is a better fit to express aggregation logic 
where a record in the
+output could be computed from an arbitrary number of records in the input.
+"""
+pass
+
+
+class Model(Transformer[T], ABC):
+"""
+A Model is typically generated by invoking :func:`~Estimator.fit`. A Model 
is a Transformer with
+the extra APIs to set and get model data.
+"""
+
+def set_model_data(self, *inputs: Table) -> None:
+raise Exception("This operation is not supported.")
+
+def get_model_data(self) -> None:
+"""
+Gets a list of tables representing the model data. Each table could be 
an unbounded stream
+of model data changes.
+
+:return: A list of tables.
+"""
+raise Exception("This operation is not supported.")
+
+
+class Estimator(Generic[E, M], Stage[E], ABC):
+"""
+Estimators are responsible for training and generating Models.
+"""
+
+def fit(self, *inputs: Table) -> Model[M]:
+"""
+Trains on the given inputs and produces a Model.
+
+:param inputs: A list of tables.
+:return: A Model.
+"""
+pass
+
+
+class PipelineModel(Model):
+"""
+A PipelineModel acts as a Model. It consists of an ordered list of stages, 
each of which could
+be a Model, Transformer or AlgoOperator.
+"""
+
+def __init__(self, stages: List[Stage]):
+self._stages = stages
+
+def transform(self, *inputs: Table) -> List[Table]:
+"""
+Applies all stages in this PipelineModel on the input tables in order. 
The output of one
+stage is used as the input of the next stage (if any). The output of 
the last stage is
+returned as the result of this method.
+
+:param inputs: A list of tables.
+:return: A list of tables.
+"""
+for stage in self._stages:
+if 

[GitHub] [flink-ml] lindong28 commented on a change in pull request #36: [FLINK-24933][ML] Support ML Python API

2021-11-18 Thread GitBox


lindong28 commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r752877279



##
File path: flink-ml-python/apache_flink_ml/ml/api/core.py
##
@@ -0,0 +1,221 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from abc import ABC, abstractmethod
+from typing import TypeVar, Generic, List
+
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import Table
+
+from apache_flink_ml.ml.param.param import WithParams
+
+T = TypeVar('T')
+E = TypeVar('E')
+M = TypeVar('M')
+
+
+class Stage(WithParams[T], ABC):
+"""
+Base class for a node in a Pipeline or Graph. The interface is only a 
concept, and does not have
+any actual functionality. Its subclasses could be Estimator, Model, 
Transformer or AlgoOperator.
+No other classes should inherit this interface directly.
+
+Each stage is with parameters, and requires a public empty constructor for 
restoration.
+"""
+
+@abstractmethod
+def save(self, path: str) -> None:
+"""
+Saves this stage to the given path.
+"""
+pass
+
+@classmethod
+@abstractmethod
+def load(cls, env: StreamExecutionEnvironment, path: str) -> T:
+"""
+Instantiates a new stage instance based on the data read from the 
given path.
+"""
+pass
+
+
+class AlgoOperator(Stage[T], ABC):
+"""
+An AlgoOperator takes a list of tables as inputs and produces a list of 
tables as results. It
+can be used to encode generic multi-input multi-output computation logic.
+"""
+
+@abstractmethod
+def transform(self, *inputs: Table) -> List[Table]:
+"""
+Applies the AlgoOperator on the given input tables and returns the 
result tables.
+
+:param inputs: A list of tables.
+:return: A list of tables.
+"""
+pass
+
+
+class Transformer(AlgoOperator[T], ABC):
+"""
+A Transformer is an AlgoOperator with the semantic difference that it 
encodes the Transformation
+logic, such that a record in the output typically corresponds to one 
record in the input. In
+contrast, an AlgoOperator is a better fit to express aggregation logic 
where a record in the
+output could be computed from an arbitrary number of records in the input.
+"""
+pass
+
+
+class Model(Transformer[T], ABC):
+"""
+A Model is typically generated by invoking :func:`~Estimator.fit`. A Model 
is a Transformer with
+the extra APIs to set and get model data.
+"""
+
+def set_model_data(self, *inputs: Table) -> None:
+raise Exception("This operation is not supported.")
+
+def get_model_data(self) -> None:
+"""
+Gets a list of tables representing the model data. Each table could be 
an unbounded stream
+of model data changes.
+
+:return: A list of tables.
+"""
+raise Exception("This operation is not supported.")
+
+
+class Estimator(Generic[E, M], Stage[E], ABC):
+"""
+Estimators are responsible for training and generating Models.
+"""
+
+def fit(self, *inputs: Table) -> Model[M]:
+"""
+Trains on the given inputs and produces a Model.
+
+:param inputs: A list of tables.
+:return: A Model.
+"""
+pass
+
+
+class PipelineModel(Model):
+"""
+A PipelineModel acts as a Model. It consists of an ordered list of stages, 
each of which could
+be a Model, Transformer or AlgoOperator.
+"""
+
+def __init__(self, stages: List[Stage]):
+self._stages = stages
+
+def transform(self, *inputs: Table) -> List[Table]:
+"""
+Applies all stages in this PipelineModel on the input tables in order. 
The output of one
+stage is used as the input of the next stage (if any). The output of 
the last stage is
+returned as the result of this method.
+
+:param inputs: A list of tables.
+:return: A list of tables.
+"""
+for stage in self._stages:
+if 

[GitHub] [flink] flinkbot edited a comment on pull request #17771: [FLINK-24813][table-planner]Improve ImplicitTypeConversionITCase

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17771:
URL: https://github.com/apache/flink/pull/17771#issuecomment-966899848


   
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 3c13e34acd0884702aec4ded14bba94b62a6ba22 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26738)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17771: [FLINK-24813][table-planner]Improve ImplicitTypeConversionITCase

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17771:
URL: https://github.com/apache/flink/pull/17771#issuecomment-966899848


   
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 3c13e34acd0884702aec4ded14bba94b62a6ba22 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xuyangzhong commented on pull request #17771: [FLINK-24813][table-planner]Improve ImplicitTypeConversionITCase

2021-11-18 Thread GitBox


xuyangzhong commented on pull request #17771:
URL: https://github.com/apache/flink/pull/17771#issuecomment-973767628


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-18 Thread zoucao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446284#comment-17446284
 ] 

zoucao commented on FLINK-24958:


Thanks for your reminder, [~leonard], the change is minor and simple, such that 
I open the pr directly. 
I'll pay attention for the next time, Thanks again for your reminder.

> correct the example and link for temporal table function documentation 
> ---
>
> Key: FLINK-24958
> URL: https://issues.apache.org/jira/browse/FLINK-24958
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: zoucao
>Assignee: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17582: [FLINK-24674][kubernetes] Create corresponding resouces for task manager Pods

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17582:
URL: https://github.com/apache/flink/pull/17582#issuecomment-953241058


   
   ## CI report:
   
   * 3eb296e77182671d24dab3c29b9d874860165725 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26737)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] RocMarshal commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…

2021-11-18 Thread GitBox


RocMarshal commented on a change in pull request #15755:
URL: https://github.com/apache/flink/pull/15755#discussion_r752861919



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable catalogTable) {
+
+Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+Schema.Builder builder = Schema.newBuilder();
+// build column
+modifiedTableSchema.getColumns().stream()
+.forEach(
+column -> {
+if (StringUtils.equals(column.getName(), 
originColumnName)) {
+buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+} else {
+buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+}
+});
+// build primary key column
+List originPrimaryKeyNames =
+modifiedTableSchema
+.getPrimaryKey()
+.map(Schema.UnresolvedPrimaryKey::getColumnNames)
+.orElseGet(Lists::newArrayList);
+
+List newPrimaryKeyNames =
+originPrimaryKeyNames.stream()
+.map(
+pkName ->
+StringUtils.equals(pkName, 
originColumnName)
+? newColumnName
+: pkName)
+.collect(Collectors.toList());
+
+if (newPrimaryKeyNames.size() > 0) {
+builder.primaryKey(newPrimaryKeyNames);
+}
+// build watermark
+modifiedTableSchema.getWatermarkSpecs().stream()
+.forEach(
+watermarkSpec -> {
+String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+String newWatermarkExpression =
+((SqlCallExpression) 
watermarkExpression)
+.getSqlExpression()
+
.replace(watermarkRefColumnName, newColumnName);

Review comment:
   > In any case, we should start putting each operation execution in a 
separate class. Otherwise `OperationConverterUtils` or `OperationConverter` 
grow infinitely. `OperationConverter` can just be a registry for mapping the 
instance to a class. We can then make certain entities available e.g. schema 
resolver or catalog manager.
   
   @twalthr @wuchong In my limited read, it would be better if we would do the 
refactor on the parts of OperationConverter after completing FLINK-21634. 
Please let me know what's your opinion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17830: [FLINK-24893][Table SQL/Client][FLIP-189] SQL Client prompts customization

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17830:
URL: https://github.com/apache/flink/pull/17830#issuecomment-973131002


   
   ## CI report:
   
   * 63ddba017e24c798b2f75f773b645ecaa22b5bc3 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26727)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 12bc9ca2640c7773b2ca7fa50e204605183ff309 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26734)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752855277



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java
##
@@ -0,0 +1,223 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.castTo;
+import static org.apache.flink.ml.classification.knn.KnnUtil.serializeResult;
+import static org.apache.flink.ml.classification.knn.KnnUtil.updateQueue;
+
+/** knn model data, which will be used to calculate the distances between 
nodes. */
+public class KnnModelData implements Serializable, Cloneable {
+private static final long serialVersionUID = -2940551481683238630L;
+private final List dictData;
+private final EuclideanDistance fastDistance;
+protected Comparator> comparator;
+private DataType idType;
+
+/**
+ * constructor.
+ *
+ * @param list BaseFastDistanceData list.
+ * @param fastDistance used to accelerate the speed of calculating 
distance.
+ */
+public KnnModelData(List list, EuclideanDistance 
fastDistance) {
+this.dictData = list;
+this.fastDistance = fastDistance;
+comparator = Comparator.comparingDouble(o -> -o.f0);
+}
+
+/**
+ * set id type.
+ *
+ * @param idType id type.
+ */
+public void setIdType(DataType idType) {
+this.idType = idType;
+}
+
+/**
+ * find the nearest topN neighbors from whole nodes.
+ *
+ * @param input input node.
+ * @param topN top N.
+ * @param radius the parameter to describe the range to find neighbors.
+ * @return
+ */
+public String findNeighbor(Object input, Integer topN, Double radius) {

Review comment:
   I agree with you. I will change the code later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752856546



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnUtil.java
##
@@ -0,0 +1,428 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.StringUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+
+/** Utility class for the knn algorithm. */
+public class KnnUtil {

Review comment:
   OK, I will refine it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752856415



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/Knn.java
##
@@ -0,0 +1,220 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.MapPartitionFunctionWrapper;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.findColIndex;
+import static org.apache.flink.ml.classification.knn.KnnUtil.findColIndices;
+import static org.apache.flink.ml.classification.knn.KnnUtil.merge;
+import static org.apache.flink.ml.classification.knn.KnnUtil.pGson;
+
+/**
+ * KNN classifier is to classify unlabeled observations by assigning them to 
the class of the most

Review comment:
   OK. I will update the comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752856110



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,343 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.extractObject;
+import static org.apache.flink.ml.classification.knn.KnnUtil.findColTypes;
+import static org.apache.flink.ml.classification.knn.KnnUtil.pGson;
+import static 
org.apache.flink.ml.classification.knn.KnnUtil.resolvedSchema2Schema;
+
+/** Knn classification model fitted by KnnClassifier. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+public static final String BROADCAST_STR = "broadcastModelKey";
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+private Table[] modelData;
+
+/** constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public KnnModel(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * Set model data for knn prediction.
+ *
+ * @param modelData knn model.
+ * @return knn classification model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * get model data.
+ *
+ * @return list of tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * @param inputs a list of tables.
+ * @return result.
+ */
+@Override
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+
+Map> broadcastMap = new HashMap<>(1);
+broadcastMap.put(BROADCAST_STR, model);
+ResolvedSchema modelSchema = modelData[0].getResolvedSchema();
+DataType idType =
+
modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1);
+
+ResolvedSchema outputSchema =
+getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), 
idType);
+
+DataType[] dataTypes = outputSchema.getColumnDataTypes().toArray(new 
DataType[0]);
+

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752855277



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java
##
@@ -0,0 +1,223 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.castTo;
+import static org.apache.flink.ml.classification.knn.KnnUtil.serializeResult;
+import static org.apache.flink.ml.classification.knn.KnnUtil.updateQueue;
+
+/** knn model data, which will be used to calculate the distances between 
nodes. */
+public class KnnModelData implements Serializable, Cloneable {
+private static final long serialVersionUID = -2940551481683238630L;
+private final List dictData;
+private final EuclideanDistance fastDistance;
+protected Comparator> comparator;
+private DataType idType;
+
+/**
+ * constructor.
+ *
+ * @param list BaseFastDistanceData list.
+ * @param fastDistance used to accelerate the speed of calculating 
distance.
+ */
+public KnnModelData(List list, EuclideanDistance 
fastDistance) {
+this.dictData = list;
+this.fastDistance = fastDistance;
+comparator = Comparator.comparingDouble(o -> -o.f0);
+}
+
+/**
+ * set id type.
+ *
+ * @param idType id type.
+ */
+public void setIdType(DataType idType) {
+this.idType = idType;
+}
+
+/**
+ * find the nearest topN neighbors from whole nodes.
+ *
+ * @param input input node.
+ * @param topN top N.
+ * @param radius the parameter to describe the range to find neighbors.
+ * @return
+ */
+public String findNeighbor(Object input, Integer topN, Double radius) {

Review comment:
   the calculation is using model to predict. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752855026



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/EuclideanDistance.java
##
@@ -0,0 +1,272 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.flink.ml.classification.knn.KnnUtil.appendVectorToMatrix;
+
+/**
+ * Euclidean distance is the "ordinary" straight-line distance between two 
points in Euclidean
+ * space.
+ *
+ * https://en.wikipedia.org/wiki/Euclidean_distance
+ *
+ * Given two vectors a and b, Euclidean Distance = ||a - b||, where ||*|| 
means the L2 norm of
+ * the vector.
+ */
+public class EuclideanDistance implements Serializable {

Review comment:
   you are right, in future we will do this, but not now. because kmeans 
and knn EuclideanDistance has different feature.
   because knn EuclideanDistance do some optimization, just like use matrix 
accelerate distance calculation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752853592



##
File path: flink-ml-lib/pom.xml
##
@@ -106,6 +112,11 @@ under the License.
   jar
   test
 
+  
+  com.google.code.gson
+  gson
+  2.8.6
+  

Review comment:
   ReadWriteUtils may not satisfy my need, it only a utils to help 
serialize model data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752853041



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/DenseMatrix.java
##
@@ -0,0 +1,161 @@
+package org.apache.flink.ml.classification.knn;
+
+import java.io.Serializable;
+
+/**
+ * Knn DenseMatrix stores dense matrix data and provides some methods to 
operate on the matrix it
+ * represents. This data structure helps knn to accelerate distance 
calculation.
+ */
+public class DenseMatrix implements Serializable {

Review comment:
   OK, I agree with you. I will move it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752851858



##
File path: flink-ml-api/src/main/java/org/apache/flink/ml/linalg/Vector.java
##
@@ -29,6 +29,10 @@
 /** Gets the value of the ith element. */
 double get(int i);
 
+
+/** set the value of the ith element. */
+void set(int i, double val);

Review comment:
   In some case, we just want to update the vector, we use sample to update 
special value. if use vector new, it may allocate a new memory, it not always 
the good thing.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-18 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446275#comment-17446275
 ] 

Leonard Xu commented on FLINK-24958:


Thanks [~zoucao] for report this, I assign this ticket to you as you has opened 
the PR.
minor tips: The community encourage to discuss firstly and then to open the PR.

> correct the example and link for temporal table function documentation 
> ---
>
> Key: FLINK-24958
> URL: https://issues.apache.org/jira/browse/FLINK-24958
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: zoucao
>Assignee: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17582: [FLINK-24674][kubernetes] Create corresponding resouces for task manager Pods

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17582:
URL: https://github.com/apache/flink/pull/17582#issuecomment-953241058


   
   ## CI report:
   
   * 99fa2e362ef929e2549e5aa76ef2c11b7b39891d Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25540)
 
   * 3eb296e77182671d24dab3c29b9d874860165725 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26737)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17582: [FLINK-24674][kubernetes] Create corresponding resouces for task manager Pods

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17582:
URL: https://github.com/apache/flink/pull/17582#issuecomment-953241058


   
   ## CI report:
   
   * 99fa2e362ef929e2549e5aa76ef2c11b7b39891d Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25540)
 
   * 3eb296e77182671d24dab3c29b9d874860165725 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-18 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-24958:
--

Assignee: zoucao

> correct the example and link for temporal table function documentation 
> ---
>
> Key: FLINK-24958
> URL: https://issues.apache.org/jira/browse/FLINK-24958
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: zoucao
>Assignee: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] viirya commented on pull request #17582: [FLINK-24674][kubernetes] Create corresponding resouces for task manager Pods

2021-11-18 Thread GitBox


viirya commented on pull request #17582:
URL: https://github.com/apache/flink/pull/17582#issuecomment-973717126


   @gyfora I addressed your two previous comments. Thanks. I'll see if I can 
add a test for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17832: [FLINK-24958][docs]correct the example and link for temporal table fu…

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17832:
URL: https://github.com/apache/flink/pull/17832#issuecomment-973714890


   
   ## CI report:
   
   * 5b2208841608db9795796828f11c82060392a777 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26736)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] viirya commented on a change in pull request #17582: [FLINK-24674][kubernetes] Create corresponding resouces for task manager Pods

2021-11-18 Thread GitBox


viirya commented on a change in pull request #17582:
URL: https://github.com/apache/flink/pull/17582#discussion_r752848360



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##
@@ -171,23 +181,25 @@ public void deregisterApplication(
 parameters.getTaskManagerCPU());
 
 final CompletableFuture createPodFuture =
-flinkKubeClient.createTaskManagerPod(taskManagerPod);
+flinkKubeClient.createTaskManagerPod(taskManagerSpec);
 
+final KubernetesPod finalTaskManagerPod = taskManagerPod;
+final String finalPodName = podName;

Review comment:
   Just tried it. Because we can only get `taskManagerPod`, `podName` in 
the try-catch block. If we put only `buildTaskManagerKubernetesPod` in there, 
we need to move `taskManagerSpec`, `podName`, `taskManagerPod` out of the 
block, i.e.,
   
   ```java
   KubernetesTaskManagerSpecification taskManagerSpec = null;
   KubernetesPod taskManagerPod = null;
   String podName = null;
   try {
 taskManagerSpec = ...
 ...
   } catch (Exception e) {
 requestResourceFuture.completeExceptionally(e);
   }
   ```
   
   Due to "Variable used in lambda expression should be final or effectively 
final", I need to have copies of them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #17832: [FLINK-24958][docs]correct the example and link for temporal table fu…

2021-11-18 Thread GitBox


flinkbot commented on pull request #17832:
URL: https://github.com/apache/flink/pull/17832#issuecomment-973715452


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 5b2208841608db9795796828f11c82060392a777 (Fri Nov 19 
04:03:53 UTC 2021)
   
   **Warnings:**
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-24958).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #17832: [FLINK-24958][docs]correct the example and link for temporal table fu…

2021-11-18 Thread GitBox


flinkbot commented on pull request #17832:
URL: https://github.com/apache/flink/pull/17832#issuecomment-973714890


   
   ## CI report:
   
   * 5b2208841608db9795796828f11c82060392a777 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-18 Thread zoucao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446273#comment-17446273
 ] 

zoucao commented on FLINK-24958:


cc [~Leonard]

> correct the example and link for temporal table function documentation 
> ---
>
> Key: FLINK-24958
> URL: https://issues.apache.org/jira/browse/FLINK-24958
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24958:
---
Labels: pull-request-available  (was: )

> correct the example and link for temporal table function documentation 
> ---
>
> Key: FLINK-24958
> URL: https://issues.apache.org/jira/browse/FLINK-24958
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] zoucao opened a new pull request #17832: [FLINK-24958][docs]correct the example and link for temporal table fu…

2021-11-18 Thread GitBox


zoucao opened a new pull request #17832:
URL: https://github.com/apache/flink/pull/17832


   …nction documentation
   
   
   
   ## What is the purpose of the change
   
   correct the example and link for temporal table function documentation.
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-18 Thread zoucao (Jira)
zoucao created FLINK-24958:
--

 Summary: correct the example and link for temporal table function 
documentation 
 Key: FLINK-24958
 URL: https://issues.apache.org/jira/browse/FLINK-24958
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.14.0
Reporter: zoucao
 Fix For: 1.15.0


correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17831: [FLINK-15825][Table SQL/API] Add renameDatabase() to Catalog

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17831:
URL: https://github.com/apache/flink/pull/17831#issuecomment-973310719


   
   ## CI report:
   
   * cf69d827d81b781bc21b2faf0badb552aa884415 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26726)
 
   * a7e983255a689ff59abd268ca0711d43b74a246b Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26735)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17831: [FLINK-15825][Table SQL/API] Add renameDatabase() to Catalog

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17831:
URL: https://github.com/apache/flink/pull/17831#issuecomment-973310719


   
   ## CI report:
   
   * cf69d827d81b781bc21b2faf0badb552aa884415 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26726)
 
   * a7e983255a689ff59abd268ca0711d43b74a246b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #30: [FLINK-24845] Add allreduce utility function in FlinkML

2021-11-18 Thread GitBox


zhipeng93 commented on a change in pull request #30:
URL: https://github.com/apache/flink-ml/pull/30#discussion_r752817357



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/allreduce/AllReduceUtils.java
##
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.allreduce;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Applies all-reduce on a DataStream where each partition contains only one 
double array.
+ *
+ * AllReduce is a communication primitive widely used in MPI. In this 
implementation, all workers
+ * do reduce on a partition of the whole data and they all get the final 
reduce result. In detail,
+ * we split each double array into pieces of fixed size buffer (4KB by 
default) and let each subtask
+ * handle several pieces.
+ *
+ * There're mainly three stages:
+ * 1. All workers send their partial data to other workers for reduce.
+ * 2. All workers do reduce on all data it received and then send partial 
results to others.
+ * 3. All workers merge partial results into final result.
+ */
+public class AllReduceUtils {
+
+private static final int TRANSFER_BUFFER_SIZE = 1024 * 4;
+
+/**
+ * Applies allReduce on the input data stream. The input data stream is 
supposed to contain one
+ * double array in each partition. The result data stream has the same 
parallelism as the input,
+ * where each partition contains one double array that sums all of the 
double arrays in the
+ * input data stream.
+ *
+ * Note that we throw exception when one of the following two cases 
happen:
+ * 1. There exists one partition that contains more than one double 
array.
+ * 2. The length of double array is not consistent among all 
partitions.
+ *
+ * @param input The input data stream.
+ * @return The result data stream.
+ */
+public static DataStream allReduce(DataStream input) {
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSend =
+input.transform(
+"all-reduce-send",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSend())
+.name("all-reduce-send");
+
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSum =
+allReduceSend
+.partitionCustom((key, numPartitions) -> key % 
numPartitions, x -> x.f1)
+.transform(
+"all-reduce-sum",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSum())
+.name("all-reduce-sum");
+
+return allReduceSum
+.partitionCustom((key, 

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #24: Flink 24557

2021-11-18 Thread GitBox


yunfengzhou-hub commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r752833908



##
File path: flink-ml-lib/pom.xml
##
@@ -106,6 +112,11 @@ under the License.
   jar
   test
 
+  
+  com.google.code.gson
+  gson
+  2.8.6
+  

Review comment:
   `ReadWriteUtils` has provided methods to generate json. Shall we reuse 
the existing method and avoid adding new dependencies?

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnUtil.java
##
@@ -0,0 +1,428 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.StringUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+
+/** Utility class for the knn algorithm. */
+public class KnnUtil {

Review comment:
   Methods in this class could be placed in classes like `TableUtils` and 
`VectorUtils`.

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/EuclideanDistance.java
##
@@ -0,0 +1,272 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.flink.ml.classification.knn.KnnUtil.appendVectorToMatrix;
+
+/**
+ * Euclidean distance is the "ordinary" straight-line distance between two 
points in Euclidean
+ * space.
+ *
+ * https://en.wikipedia.org/wiki/Euclidean_distance
+ *
+ * Given two vectors a and b, Euclidean Distance = ||a - b||, where ||*|| 
means the L2 norm of
+ * the vector.
+ */
+public class EuclideanDistance implements Serializable {

Review comment:
   The design of an independent `EuclideanDistance` is different from both 
Alink(which implements `FastDistance`) and spark(which does not have such 
class). Flink ML also has `EuclideanDistanceMeasure` which might help achieving 
this functionality. It might be better to introduce distance classes while 
following existing or discussed conventions.

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java
##
@@ -0,0 +1,223 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.castTo;
+import static org.apache.flink.ml.classification.knn.KnnUtil.serializeResult;
+import static org.apache.flink.ml.classification.knn.KnnUtil.updateQueue;
+
+/** knn model data, which will be used to calculate the distances between 
nodes. */
+public class KnnModelData implements Serializable, Cloneable {
+private static final long serialVersionUID = -2940551481683238630L;
+private final List dictData;
+private final EuclideanDistance fastDistance;
+protected Comparator> comparator;
+private DataType idType;
+
+/**
+ * constructor.
+ *
+ * @param list BaseFastDistanceData list.
+ * @param fastDistance used to accelerate the speed of 

[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * ccf96fb549f1917fe888d69466a7e10013aa76ca Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26717)
 
   * 12bc9ca2640c7773b2ca7fa50e204605183ff309 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26734)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24897) Enable application mode on YARN to use usrlib

2021-11-18 Thread Biao Geng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446258#comment-17446258
 ] 

Biao Geng commented on FLINK-24897:
---

Hi [~trohrmann] , would you mind giving any comments on if we have the promise 
that jars under {{usrlib}} will always be loaded by user classloader? 
Besides, after above discussion with Yang, the current solution is:
1. {{usrlib}} will be shipped automatically if it exists.
2. If we add {{usrlib}} in ship files again, we will throw exception.
3. {{usrlib}} will work for both per job and application mode.
4. Jars in {{usrlib}} will be loaded by user classloader only when 
UserJarInclusion is DISABLED. In other cases, AppClassLoader will be used.

Thanks.

> Enable application mode on YARN to use usrlib
> -
>
> Key: FLINK-24897
> URL: https://issues.apache.org/jira/browse/FLINK-24897
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Biao Geng
>Priority: Major
>
> Hi there, 
> I am working to utilize application mode to submit flink jobs to YARN cluster 
> but I find that currently there is no easy way to ship my user-defined 
> jars(e.g. some custom connectors or udf jars that would be shared by some 
> jobs) and ask the FlinkUserCodeClassLoader to load classes in these jars. 
> I checked some relevant jiras, like  FLINK-21289. In k8s mode, there is a 
> solution that users can use `usrlib` directory to store their user-defined 
> jars and these jars would be loaded by FlinkUserCodeClassLoader when the job 
> is executed on JM/TM.
> But on YARN mode, `usrlib` does not work as that:
> In this method(org.apache.flink.yarn.YarnClusterDescriptor#addShipFiles), if 
> I want to use `yarn.ship-files` to ship `usrlib` from my flink client(in my 
> local machine) to remote cluster, I must not set  UserJarInclusion to 
> DISABLED due to the checkArgument(). However, if I do not set that option to 
> DISABLED, the user jars to be shipped will be added into systemClassPaths. As 
> a result, classes in those user jars will be loaded by AppClassLoader. 
> But if I do not ship these jars, there is no convenient way to utilize these 
> jars in my flink run command. Currently, all I can do seems to use `-C` 
> option, which means I have to upload my jars to some shared store first and 
> then use these remote paths. It is not so perfect as we have already make it 
> possible to ship jars or files directly and we also introduce `usrlib` in 
> application mode on YARN. It would be more user-friendly if we can allow 
> shipping `usrlib` from local to remote cluster while using 
> FlinkUserCodeClassLoader to load classes in the jars in `usrlib`.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * ccf96fb549f1917fe888d69466a7e10013aa76ca Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26717)
 
   * 12bc9ca2640c7773b2ca7fa50e204605183ff309 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on pull request #24: Flink 24557

2021-11-18 Thread GitBox


weibozhao commented on pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#issuecomment-973691985


   update the latest code already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * ccf96fb549f1917fe888d69466a7e10013aa76ca Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26717)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * ccf96fb549f1917fe888d69466a7e10013aa76ca Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26717)
 
   * 12bc9ca2640c7773b2ca7fa50e204605183ff309 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-24957) show `host:port` information in `subtasks` tab

2021-11-18 Thread Xianxun Ye (Jira)
Xianxun Ye created FLINK-24957:
--

 Summary: show `host:port` information in `subtasks` tab
 Key: FLINK-24957
 URL: https://issues.apache.org/jira/browse/FLINK-24957
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Affects Versions: 1.14.0
Reporter: Xianxun Ye
 Attachments: image-2021-11-19-10-45-41-395.png

Help users locate container and find logs through subtask id when there are 
multi containers running on the same host

 

!image-2021-11-19-10-45-41-395.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-24835) "group by" in the interval join will throw a exception

2021-11-18 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-24835.
--
Resolution: Fixed

Fixed in 
  1.15.0: 8aa74d5ad7026734bdd98eabbc9cbbb243bbe8b0
  1.14.1: d3df986a75e34e1ed475b2e1236b7770698e7bd1

> "group by" in the interval join will throw a exception
> --
>
> Key: FLINK-24835
> URL: https://issues.apache.org/jira/browse/FLINK-24835
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: Jing Zhang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> Can reproduce this bug by the following code added into 
> IntervalJoinTest.scala:
> {code:java}
> @Test
> def testSemiIntervalJoinWithSimpleConditionAndGroup(): Unit = {
>   val sql =
> """
>   |SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
>   | SELECT t2.a FROM MyTable2 t2
>   |   WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime 
> + INTERVAL '5' MINUTE
>   |   GROUP BY t2.a
>   |)
> """.stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The exception is :
> {code:java}
> java.lang.IllegalStateException
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
>     at 
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalJoinRule.matches(StreamPhysicalJoinRule.scala:64)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:64)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at 

[jira] [Commented] (FLINK-24951) Allow watch bookmarks to mitigate frequent watcher rebuilding

2021-11-18 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446253#comment-17446253
 ] 

Yang Wang commented on FLINK-24951:
---

I am afraid some old K8s version does not support bookmark and not pretty sure 
about what will happen.

Bookmark[1] could really help with decreasing the "too old resource version" 
for TaskManager pods watching. Because there are no changes of these 
TaskManager pods after launched successfully.

If there's no compatibility issues, I am in favor of enabling the bookmark by 
default.

 

[1]. https://stackoverflow.com/questions/66080942/what-k8s-bookmark-solves

 

> Allow watch bookmarks to mitigate frequent watcher rebuilding
> -
>
> Key: FLINK-24951
> URL: https://issues.apache.org/jira/browse/FLINK-24951
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.15.0
>Reporter: Yangze Guo
>Priority: Major
> Fix For: 1.15.0
>
>
> In some production environments, there are massive pods that create and 
> delete. Thus the global resource version is updated very quickly and may 
> cause frequent watcher rebuilding because of "too old resource version". To 
> avoid this, K8s provide a Bookmark mechanism[1].
> I propose to enable bookmark by default
> [1] 
> https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #30: [FLINK-24845] Add allreduce utility function in FlinkML

2021-11-18 Thread GitBox


zhipeng93 commented on a change in pull request #30:
URL: https://github.com/apache/flink-ml/pull/30#discussion_r752825779



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/allreduce/AllReduceUtils.java
##
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.allreduce;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Applies all-reduce on a DataStream where each partition contains only one 
double array.
+ *
+ * AllReduce is a communication primitive widely used in MPI. In this 
implementation, all workers
+ * do reduce on a partition of the whole data and they all get the final 
reduce result. In detail,
+ * we split each double array into pieces of fixed size buffer (4KB by 
default) and let each subtask
+ * handle several pieces.
+ *
+ * There're mainly three stages:
+ * 1. All workers send their partial data to other workers for reduce.
+ * 2. All workers do reduce on all data it received and then send partial 
results to others.
+ * 3. All workers merge partial results into final result.
+ */
+public class AllReduceUtils {
+
+private static final int TRANSFER_BUFFER_SIZE = 1024 * 4;
+
+/**
+ * Applies allReduce on the input data stream. The input data stream is 
supposed to contain one
+ * double array in each partition. The result data stream has the same 
parallelism as the input,
+ * where each partition contains one double array that sums all of the 
double arrays in the
+ * input data stream.
+ *
+ * Note that we throw exception when one of the following two cases 
happen:
+ * 1. There exists one partition that contains more than one double 
array.
+ * 2. The length of double array is not consistent among all 
partitions.
+ *
+ * @param input The input data stream.
+ * @return The result data stream.
+ */
+public static DataStream allReduce(DataStream input) {
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSend =
+input.transform(
+"all-reduce-send",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSend())
+.name("all-reduce-send");
+
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSum =
+allReduceSend
+.partitionCustom((key, numPartitions) -> key % 
numPartitions, x -> x.f1)
+.transform(
+"all-reduce-sum",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSum())
+.name("all-reduce-sum");
+
+return allReduceSum
+.partitionCustom((key, 

[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #30: [FLINK-24845] Add allreduce utility function in FlinkML

2021-11-18 Thread GitBox


zhipeng93 commented on a change in pull request #30:
URL: https://github.com/apache/flink-ml/pull/30#discussion_r752817357



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/allreduce/AllReduceUtils.java
##
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.allreduce;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Applies all-reduce on a DataStream where each partition contains only one 
double array.
+ *
+ * AllReduce is a communication primitive widely used in MPI. In this 
implementation, all workers
+ * do reduce on a partition of the whole data and they all get the final 
reduce result. In detail,
+ * we split each double array into pieces of fixed size buffer (4KB by 
default) and let each subtask
+ * handle several pieces.
+ *
+ * There're mainly three stages:
+ * 1. All workers send their partial data to other workers for reduce.
+ * 2. All workers do reduce on all data it received and then send partial 
results to others.
+ * 3. All workers merge partial results into final result.
+ */
+public class AllReduceUtils {
+
+private static final int TRANSFER_BUFFER_SIZE = 1024 * 4;
+
+/**
+ * Applies allReduce on the input data stream. The input data stream is 
supposed to contain one
+ * double array in each partition. The result data stream has the same 
parallelism as the input,
+ * where each partition contains one double array that sums all of the 
double arrays in the
+ * input data stream.
+ *
+ * Note that we throw exception when one of the following two cases 
happen:
+ * 1. There exists one partition that contains more than one double 
array.
+ * 2. The length of double array is not consistent among all 
partitions.
+ *
+ * @param input The input data stream.
+ * @return The result data stream.
+ */
+public static DataStream allReduce(DataStream input) {
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSend =
+input.transform(
+"all-reduce-send",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSend())
+.name("all-reduce-send");
+
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSum =
+allReduceSend
+.partitionCustom((key, numPartitions) -> key % 
numPartitions, x -> x.f1)
+.transform(
+"all-reduce-sum",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSum())
+.name("all-reduce-sum");
+
+return allReduceSum
+.partitionCustom((key, 

[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #30: [FLINK-24845] Add allreduce utility function in FlinkML

2021-11-18 Thread GitBox


zhipeng93 commented on a change in pull request #30:
URL: https://github.com/apache/flink-ml/pull/30#discussion_r752825161



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/allreduce/AllReduceUtils.java
##
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.allreduce;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Applies all-reduce on a DataStream where each partition contains only one 
double array.
+ *
+ * AllReduce is a communication primitive widely used in MPI. In this 
implementation, all workers
+ * do reduce on a partition of the whole data and they all get the final 
reduce result. In detail,
+ * we split each double array into pieces of fixed size buffer (4KB by 
default) and let each subtask
+ * handle several pieces.
+ *
+ * There're mainly three stages:
+ * 1. All workers send their partial data to other workers for reduce.
+ * 2. All workers do reduce on all data it received and then send partial 
results to others.
+ * 3. All workers merge partial results into final result.
+ */
+public class AllReduceUtils {
+
+private static final int TRANSFER_BUFFER_SIZE = 1024 * 4;
+
+/**
+ * Applies allReduce on the input data stream. The input data stream is 
supposed to contain one
+ * double array in each partition. The result data stream has the same 
parallelism as the input,
+ * where each partition contains one double array that sums all of the 
double arrays in the
+ * input data stream.
+ *
+ * Note that we throw exception when one of the following two cases 
happen:
+ * 1. There exists one partition that contains more than one double 
array.
+ * 2. The length of double array is not consistent among all 
partitions.
+ *
+ * @param input The input data stream.
+ * @return The result data stream.
+ */
+public static DataStream allReduce(DataStream input) {
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSend =
+input.transform(
+"all-reduce-send",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSend())
+.name("all-reduce-send");
+
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSum =
+allReduceSend
+.partitionCustom((key, numPartitions) -> key % 
numPartitions, x -> x.f1)
+.transform(
+"all-reduce-sum",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSum())
+.name("all-reduce-sum");
+
+return allReduceSum
+.partitionCustom((key, 

[GitHub] [flink] flinkbot edited a comment on pull request #17637: [FLINK-24708][planner] Fix wrong results of the IN operator

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17637:
URL: https://github.com/apache/flink/pull/17637#issuecomment-957122792


   
   ## CI report:
   
   * 6a53898a0533f50a3af56be193cdf248cdea6332 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26663)
 
   * d50d09006bb41b1c8ab87b868aa796637c27cce6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26733)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17637: [FLINK-24708][planner] Fix wrong results of the IN operator

2021-11-18 Thread GitBox


flinkbot edited a comment on pull request #17637:
URL: https://github.com/apache/flink/pull/17637#issuecomment-957122792


   
   ## CI report:
   
   * 6a53898a0533f50a3af56be193cdf248cdea6332 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26663)
 
   * d50d09006bb41b1c8ab87b868aa796637c27cce6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] HuangXingBo commented on a change in pull request #36: [FLINK-24933][ML] Support ML Python API

2021-11-18 Thread GitBox


HuangXingBo commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r752820431



##
File path: flink-ml-python/apache_flink_ml/ml/api/core.py
##
@@ -0,0 +1,221 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from abc import ABC, abstractmethod
+from typing import TypeVar, Generic, List
+
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import Table
+
+from apache_flink_ml.ml.param.param import WithParams
+
+T = TypeVar('T')
+E = TypeVar('E')
+M = TypeVar('M')
+
+
+class Stage(WithParams[T], ABC):
+"""
+Base class for a node in a Pipeline or Graph. The interface is only a 
concept, and does not have
+any actual functionality. Its subclasses could be Estimator, Model, 
Transformer or AlgoOperator.
+No other classes should inherit this interface directly.
+
+Each stage is with parameters, and requires a public empty constructor for 
restoration.
+"""
+
+@abstractmethod
+def save(self, path: str) -> None:
+"""
+Saves this stage to the given path.
+"""
+pass
+
+@classmethod
+@abstractmethod
+def load(cls, env: StreamExecutionEnvironment, path: str) -> T:
+"""
+Instantiates a new stage instance based on the data read from the 
given path.
+"""
+pass
+
+
+class AlgoOperator(Stage[T], ABC):
+"""
+An AlgoOperator takes a list of tables as inputs and produces a list of 
tables as results. It
+can be used to encode generic multi-input multi-output computation logic.
+"""
+
+@abstractmethod
+def transform(self, *inputs: Table) -> List[Table]:
+"""
+Applies the AlgoOperator on the given input tables and returns the 
result tables.
+
+:param inputs: A list of tables.
+:return: A list of tables.
+"""
+pass
+
+
+class Transformer(AlgoOperator[T], ABC):
+"""
+A Transformer is an AlgoOperator with the semantic difference that it 
encodes the Transformation
+logic, such that a record in the output typically corresponds to one 
record in the input. In
+contrast, an AlgoOperator is a better fit to express aggregation logic 
where a record in the
+output could be computed from an arbitrary number of records in the input.
+"""
+pass
+
+
+class Model(Transformer[T], ABC):
+"""
+A Model is typically generated by invoking :func:`~Estimator.fit`. A Model 
is a Transformer with
+the extra APIs to set and get model data.
+"""
+
+def set_model_data(self, *inputs: Table) -> None:
+raise Exception("This operation is not supported.")
+
+def get_model_data(self) -> None:
+"""
+Gets a list of tables representing the model data. Each table could be 
an unbounded stream
+of model data changes.
+
+:return: A list of tables.
+"""
+raise Exception("This operation is not supported.")
+
+
+class Estimator(Generic[E, M], Stage[E], ABC):
+"""
+Estimators are responsible for training and generating Models.
+"""
+
+def fit(self, *inputs: Table) -> Model[M]:
+"""
+Trains on the given inputs and produces a Model.
+
+:param inputs: A list of tables.
+:return: A Model.
+"""
+pass
+
+
+class PipelineModel(Model):
+"""
+A PipelineModel acts as a Model. It consists of an ordered list of stages, 
each of which could
+be a Model, Transformer or AlgoOperator.
+"""
+
+def __init__(self, stages: List[Stage]):
+self._stages = stages
+
+def transform(self, *inputs: Table) -> List[Table]:
+"""
+Applies all stages in this PipelineModel on the input tables in order. 
The output of one
+stage is used as the input of the next stage (if any). The output of 
the last stage is
+returned as the result of this method.
+
+:param inputs: A list of tables.
+:return: A list of tables.
+"""
+for stage in self._stages:
+

[GitHub] [flink] xuyangzhong commented on a change in pull request #17771: [FLINK-24813][table-planner]Improve ImplicitTypeConversionITCase

2021-11-18 Thread GitBox


xuyangzhong commented on a change in pull request #17771:
URL: https://github.com/apache/flink/pull/17771#discussion_r752820224



##
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
##
@@ -302,23 +145,6 @@ class SqlExpressionTest extends ExpressionTestBase {
 // Decimal(2,1) / Decimal(10,0) => Decimal(23,12)
 testSqlApi("2.0/(-3)", "-0.6667")
 testSqlApi("-7.9/2", "-3.9500")
-
-// invalid division
-val divisorZeroException = "Division by zero"
-testExpectedSqlException(
-  "1/cast(0.00 as decimal)", divisorZeroException, 
classOf[ArithmeticException])
-testExpectedSqlException(
-  "1/cast(0.00 as double)", divisorZeroException, 
classOf[ArithmeticException])
-testExpectedSqlException(
-  "1/cast(0.00 as float)", divisorZeroException, 
classOf[ArithmeticException])
-testExpectedSqlException(
-  "1/cast(0 as tinyint)", divisorZeroException, 
classOf[ArithmeticException])
-testExpectedSqlException(
-  "1/cast(0 as smallint)", divisorZeroException, 
classOf[ArithmeticException])
-testExpectedSqlException(
-  "1/0", divisorZeroException, classOf[ArithmeticException])
-testExpectedSqlException(
-  "1/cast(0 as bigint)", divisorZeroException, 
classOf[ArithmeticException])

Review comment:
   My mistake, I'll revert them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-24951) Allow watch bookmarks to mitigate frequent watcher rebuilding

2021-11-18 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446247#comment-17446247
 ] 

Yangze Guo edited comment on FLINK-24951 at 11/19/21, 2:33 AM:
---

With Fabric8 5.5.0, we can enable this feature, but it cannot handle BOOKMARK 
events until 5.9.0[1]. So, we may need to upgrade k8s client as well.

Would bookmark introduce any regression for users? If so, +1 for not enabling 
it by default.

[1] https://github.com/fabric8io/kubernetes-client/pull/3488


was (Author: JIRAUSER280074):
Fabric8 5.5.0 can enable this feature, but it cannot handle BOOKMARK events 
until 5.9.0[1]. So, we may need to upgrade k8s client as well.

Would bookmark introduce any regression for users? If so, +1 for not enabling 
it by default.

[1] https://github.com/fabric8io/kubernetes-client/pull/3488

> Allow watch bookmarks to mitigate frequent watcher rebuilding
> -
>
> Key: FLINK-24951
> URL: https://issues.apache.org/jira/browse/FLINK-24951
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.15.0
>Reporter: Yangze Guo
>Priority: Major
> Fix For: 1.15.0
>
>
> In some production environments, there are massive pods that create and 
> delete. Thus the global resource version is updated very quickly and may 
> cause frequent watcher rebuilding because of "too old resource version". To 
> avoid this, K8s provide a Bookmark mechanism[1].
> I propose to enable bookmark by default
> [1] 
> https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] HuangXingBo commented on a change in pull request #36: [FLINK-24933][ML] Support ML Python API

2021-11-18 Thread GitBox


HuangXingBo commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r752819246



##
File path: flink-ml-python/apache_flink_ml/ml/tests/test_stage.py
##
@@ -0,0 +1,211 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import shutil
+import tempfile
+import unittest
+from typing import Dict, Any
+
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+from apache_flink_ml.ml.api.core import T, Stage
+from apache_flink_ml.ml.param.param import ParamValidators, Param, 
BooleanParam, IntParam, \
+FloatParam, StringParam, IntArrayParam, FloatArrayParam, StringArrayParam
+
+BOOLEAN_PARAM = BooleanParam("boolean_param", "Description", False)
+INT_PARAM = IntParam("int_param", "Description", 1, ParamValidators.lt(100))
+FLOAT_PARAM = FloatParam("float_param", "Description", 3.0, 
ParamValidators.lt(100))
+STRING_PARAM = StringParam('string_param', "Description", "5")
+INT_ARRAY_PARAM = IntArrayParam("int_array_param", "Description", [6, 7])
+FLOAT_ARRAY_PARAM = FloatArrayParam("float_array_param", "Description", [10.0, 
11.0])
+STRING_ARRAY_PARAM = StringArrayParam("string_array_param", "Description", 
["14", "15"])
+EXTRA_INT_PARAM = IntParam("extra_int_param",
+   "Description",
+   20,
+   ParamValidators.always_true())
+PARAM_WITH_NONE_DEFAULT = IntParam("param_with_none_default",
+   "Must be explicitly set with a non-none 
value",
+   None,
+   ParamValidators.not_null())
+
+
+class StageTest(unittest.TestCase):
+def setUp(self):
+self.env = StreamExecutionEnvironment.get_execution_environment()
+self.t_env = StreamTableEnvironment.create(self.env)
+
self.t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
+self.temp_dir = tempfile.mkdtemp()
+
+def tearDown(self) -> None:
+shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+def test_param_set_value_with_name(self):
+stage = MyStage()
+stage.set(INT_PARAM, 2)
+self.assertEqual(2, stage.get(INT_PARAM))
+
+param = stage.get_param("int_param")
+stage.set(param, 3)
+self.assertEqual(3, stage.get(param))
+
+param = stage.get_param('extra_int_param')
+stage.set(param, 50)
+self.assertEqual(50, stage.get(param))
+
+def test_param_with_null_default(self):
+stage = MyStage()
+import pytest
+with pytest.raises(ValueError, match='value should not be None'):
+stage.get(PARAM_WITH_NONE_DEFAULT)
+
+stage.set(PARAM_WITH_NONE_DEFAULT, 3)
+self.assertEqual(3, stage.get(PARAM_WITH_NONE_DEFAULT))
+
+def test_param_set_invalid_value(self):
+stage = MyStage()
+import pytest
+
+with pytest.raises(ValueError, match='Parameter int_param is given an 
invalid value 100.'):
+stage.set(INT_PARAM, 100)
+
+with pytest.raises(ValueError,
+   match='Parameter float_param is given an invalid 
value 100.0.'):
+stage.set(FLOAT_PARAM, 100.0)
+
+with pytest.raises(TypeError,
+   match="Parameter int_param's type  is 
incompatible with "
+ "the type of "):
+stage.set(INT_PARAM, "100")
+
+with pytest.raises(TypeError,
+   match="Parameter string_param's type  
is incompatible with"
+ " the type of "):
+stage.set(STRING_PARAM, 100)
+
+def test_param_set_valid_value(self):
+stage = MyStage()
+
+stage.set(BOOLEAN_PARAM, True)
+self.assertTrue(stage.get(BOOLEAN_PARAM))
+
+stage.set(INT_PARAM, 50)
+self.assertEqual(50, stage.get(INT_PARAM))
+
+stage.set(FLOAT_PARAM, 50.0)
+self.assertEqual(50.0, 

[jira] [Commented] (FLINK-24951) Allow watch bookmarks to mitigate frequent watcher rebuilding

2021-11-18 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446247#comment-17446247
 ] 

Yangze Guo commented on FLINK-24951:


Fabric8 5.5.0 can enable this feature, but it cannot handle BOOKMARK events 
until 5.9.0[1]. So, we may need to upgrade k8s client as well.

Would bookmark introduce any regression for users? If so, +1 for not enabling 
it by default.

[1] https://github.com/fabric8io/kubernetes-client/pull/3488

> Allow watch bookmarks to mitigate frequent watcher rebuilding
> -
>
> Key: FLINK-24951
> URL: https://issues.apache.org/jira/browse/FLINK-24951
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.15.0
>Reporter: Yangze Guo
>Priority: Major
> Fix For: 1.15.0
>
>
> In some production environments, there are massive pods that create and 
> delete. Thus the global resource version is updated very quickly and may 
> cause frequent watcher rebuilding because of "too old resource version". To 
> avoid this, K8s provide a Bookmark mechanism[1].
> I propose to enable bookmark by default
> [1] 
> https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #30: [FLINK-24845] Add allreduce utility function in FlinkML

2021-11-18 Thread GitBox


zhipeng93 commented on a change in pull request #30:
URL: https://github.com/apache/flink-ml/pull/30#discussion_r752817357



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/allreduce/AllReduceUtils.java
##
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.allreduce;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Applies all-reduce on a DataStream where each partition contains only one 
double array.
+ *
+ * AllReduce is a communication primitive widely used in MPI. In this 
implementation, all workers
+ * do reduce on a partition of the whole data and they all get the final 
reduce result. In detail,
+ * we split each double array into pieces of fixed size buffer (4KB by 
default) and let each subtask
+ * handle several pieces.
+ *
+ * There're mainly three stages:
+ * 1. All workers send their partial data to other workers for reduce.
+ * 2. All workers do reduce on all data it received and then send partial 
results to others.
+ * 3. All workers merge partial results into final result.
+ */
+public class AllReduceUtils {
+
+private static final int TRANSFER_BUFFER_SIZE = 1024 * 4;
+
+/**
+ * Applies allReduce on the input data stream. The input data stream is 
supposed to contain one
+ * double array in each partition. The result data stream has the same 
parallelism as the input,
+ * where each partition contains one double array that sums all of the 
double arrays in the
+ * input data stream.
+ *
+ * Note that we throw exception when one of the following two cases 
happen:
+ * 1. There exists one partition that contains more than one double 
array.
+ * 2. The length of double array is not consistent among all 
partitions.
+ *
+ * @param input The input data stream.
+ * @return The result data stream.
+ */
+public static DataStream allReduce(DataStream input) {
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSend =
+input.transform(
+"all-reduce-send",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSend())
+.name("all-reduce-send");
+
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSum =
+allReduceSend
+.partitionCustom((key, numPartitions) -> key % 
numPartitions, x -> x.f1)
+.transform(
+"all-reduce-sum",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSum())
+.name("all-reduce-sum");
+
+return allReduceSum
+.partitionCustom((key, 

[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #30: [FLINK-24845] Add allreduce utility function in FlinkML

2021-11-18 Thread GitBox


zhipeng93 commented on a change in pull request #30:
URL: https://github.com/apache/flink-ml/pull/30#discussion_r752817089



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/allreduce/AllReduceUtils.java
##
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.allreduce;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Applies all-reduce on a DataStream where each partition contains only one 
double array.
+ *
+ * AllReduce is a communication primitive widely used in MPI. In this 
implementation, all workers
+ * do reduce on a partition of the whole data and they all get the final 
reduce result. In detail,
+ * we split each double array into pieces of fixed size buffer (4KB by 
default) and let each subtask
+ * handle several pieces.
+ *
+ * There're mainly three stages:
+ * 1. All workers send their partial data to other workers for reduce.
+ * 2. All workers do reduce on all data it received and then send partial 
results to others.
+ * 3. All workers merge partial results into final result.
+ */
+public class AllReduceUtils {
+
+private static final int TRANSFER_BUFFER_SIZE = 1024 * 4;
+
+/**
+ * Applies allReduce on the input data stream. The input data stream is 
supposed to contain one
+ * double array in each partition. The result data stream has the same 
parallelism as the input,
+ * where each partition contains one double array that sums all of the 
double arrays in the
+ * input data stream.
+ *
+ * Note that we throw exception when one of the following two cases 
happen:
+ * 1. There exists one partition that contains more than one double 
array.
+ * 2. The length of double array is not consistent among all 
partitions.
+ *
+ * @param input The input data stream.
+ * @return The result data stream.
+ */
+public static DataStream allReduce(DataStream input) {
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSend =
+input.transform(
+"all-reduce-send",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSend())
+.name("all-reduce-send");
+
+// taskId, pieceId, totalElements, partitionedArray
+DataStream> allReduceSum =
+allReduceSend
+.partitionCustom((key, numPartitions) -> key % 
numPartitions, x -> x.f1)
+.transform(
+"all-reduce-sum",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSum())
+.name("all-reduce-sum");
+
+return allReduceSum
+.partitionCustom((key, 

[jira] [Commented] (FLINK-24951) Allow watch bookmarks to mitigate frequent watcher rebuilding

2021-11-18 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446244#comment-17446244
 ] 

Yang Wang commented on FLINK-24951:
---

Does fabric8 Kubernetes-client 5.5.0 support bookmarks? Maybe we should not 
enable bookmark by default since most users do not suffer from this issue.

> Allow watch bookmarks to mitigate frequent watcher rebuilding
> -
>
> Key: FLINK-24951
> URL: https://issues.apache.org/jira/browse/FLINK-24951
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.15.0
>Reporter: Yangze Guo
>Priority: Major
> Fix For: 1.15.0
>
>
> In some production environments, there are massive pods that create and 
> delete. Thus the global resource version is updated very quickly and may 
> cause frequent watcher rebuilding because of "too old resource version". To 
> avoid this, K8s provide a Bookmark mechanism[1].
> I propose to enable bookmark by default
> [1] 
> https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] HuangXingBo commented on a change in pull request #36: [FLINK-24933][ML] Support ML Python API

2021-11-18 Thread GitBox


HuangXingBo commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r752814620



##
File path: flink-ml-python/apache_flink_ml/mllib/__init__.py
##
@@ -0,0 +1,17 @@
+

Review comment:
   I originally thought that `ml` corresponds to the `flink-ml` module, and 
`mllib` corresponds to `flink-ml-lib` which contains various algorithm 
implementations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   >