[GitHub] [flink] lincoln-lil commented on a change in pull request #17699: [FLINK-20370][table] Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-11-21 Thread GitBox


lincoln-lil commented on a change in pull request #17699:
URL: https://github.com/apache/flink/pull/17699#discussion_r754006462



##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
   }
 }
+
+private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+  val childModifyKindSet = getModifyKindSet(sink.getInput)
+  val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+  val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+  val sinkTrait = UpdateKindTrait.fromChangelogMode(
+sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+  val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+// if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+// to beforeAndAfter mode for the correctness
+var shouldFallback: Boolean = false
+val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+if (sinkDefinedPks.nonEmpty) {
+  val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+  val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+  val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+  val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+  // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+  // fallback to beforeAndAfter.
+  // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+  // this differs from batch job's unique key inference
+  if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0

Review comment:
   This should be reserved because the metadata query may return a empty 
`changeLogUpsertKeys` set.

##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
   }
 }
+
+private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+  val childModifyKindSet = getModifyKindSet(sink.getInput)
+  val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+  val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+  val sinkTrait = UpdateKindTrait.fromChangelogMode(
+sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+  val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+// if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+// to beforeAndAfter mode for the correctness
+var shouldFallback: Boolean = false
+val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+if (sinkDefinedPks.nonEmpty) {
+  val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+  val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+  val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+  val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+  // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+  // fallback to beforeAndAfter.
+  // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+  // this differs from batch job's unique key inference
+  if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
+  || !changeLogUpsertKeys.exists {0 == _.compareTo(sinkPks)}) {
+shouldFallback = true
+  }
+}
+if (shouldFallback) {
+  Seq(beforeAndAfter)
+} else {
+  Seq(onlyAfter, beforeAndAfter)
+}
+  } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
+Seq(beforeAndAfter)
+  } else {
+Seq(UpdateKindTrait.NONE)
+  }
+  sinkRequiredTraits
+}
+
+private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink): 
Boolean = {

Review comment:
   Initially I put the two methods together, but seems a little bit 
complex, and the two methods do the different things indeed, so I change to the 
current version.

##
File path: 

[GitHub] [flink] fsk119 closed pull request #14444: [FLINK-20091][avro] add ignore-parse-error for avro formats

2021-11-21 Thread GitBox


fsk119 closed pull request #1:
URL: https://github.com/apache/flink/pull/1


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zuston edited a comment on pull request #17837: [FLINK-24963] Remove the tail separator when outputting yarn queue names

2021-11-21 Thread GitBox


zuston edited a comment on pull request #17837:
URL: https://github.com/apache/flink/pull/17837#issuecomment-973923338


   @tillrohrmann @becketqin Could you help review it? Thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24947) Flink on k8s support HostNetWork model

2021-11-21 Thread liuzhuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447227#comment-17447227
 ] 

liuzhuo commented on FLINK-24947:
-

It seems feasible to create the Service on the client first, and then update 
the targetPort after the JobManager starts successfully. This behavior is 
consistent with the current one, which is a better way.

> Flink on k8s support HostNetWork model
> --
>
> Key: FLINK-24947
> URL: https://issues.apache.org/jira/browse/FLINK-24947
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: liuzhuo
>Priority: Major
> Fix For: 1.15.0
>
>
> For the use of flink on k8s, for performance considerations, it is important 
> to choose a CNI plug-in. Usually we have two environments: Managed and 
> UnManaged.
>   Managed: Cloud vendors usually provide very efficient CNI plug-ins, we 
> don’t need to care about network performance issues
>   UnManaged: On self-built K8s clusters, CNI plug-ins are usually optional, 
> similar to Flannel and Calcico, but such software network cards usually lose 
> some performance or require some additional network strategies.
> For an unmanaged environment, if we also want to achieve the best network 
> performance, should we support the *HostNetWork* model?
> Use the host network to achieve the best performance



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24947) Flink on k8s support HostNetWork model

2021-11-21 Thread liuzhuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liuzhuo updated FLINK-24947:

Attachment: (was: image-2021-11-22-15-23-17-881.png)

> Flink on k8s support HostNetWork model
> --
>
> Key: FLINK-24947
> URL: https://issues.apache.org/jira/browse/FLINK-24947
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: liuzhuo
>Priority: Major
> Fix For: 1.15.0
>
>
> For the use of flink on k8s, for performance considerations, it is important 
> to choose a CNI plug-in. Usually we have two environments: Managed and 
> UnManaged.
>   Managed: Cloud vendors usually provide very efficient CNI plug-ins, we 
> don’t need to care about network performance issues
>   UnManaged: On self-built K8s clusters, CNI plug-ins are usually optional, 
> similar to Flannel and Calcico, but such software network cards usually lose 
> some performance or require some additional network strategies.
> For an unmanaged environment, if we also want to achieve the best network 
> performance, should we support the *HostNetWork* model?
> Use the host network to achieve the best performance



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24947) Flink on k8s support HostNetWork model

2021-11-21 Thread liuzhuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liuzhuo updated FLINK-24947:

Attachment: image-2021-11-22-15-23-17-881.png

> Flink on k8s support HostNetWork model
> --
>
> Key: FLINK-24947
> URL: https://issues.apache.org/jira/browse/FLINK-24947
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: liuzhuo
>Priority: Major
> Fix For: 1.15.0
>
>
> For the use of flink on k8s, for performance considerations, it is important 
> to choose a CNI plug-in. Usually we have two environments: Managed and 
> UnManaged.
>   Managed: Cloud vendors usually provide very efficient CNI plug-ins, we 
> don’t need to care about network performance issues
>   UnManaged: On self-built K8s clusters, CNI plug-ins are usually optional, 
> similar to Flannel and Calcico, but such software network cards usually lose 
> some performance or require some additional network strategies.
> For an unmanaged environment, if we also want to achieve the best network 
> performance, should we support the *HostNetWork* model?
> Use the host network to achieve the best performance



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] SteNicholas commented on pull request #17833: [FLINK-24785][runtime] Relocate RocksDB's log under flink log directory by default

2021-11-21 Thread GitBox


SteNicholas commented on pull request #17833:
URL: https://github.com/apache/flink/pull/17833#issuecomment-975197030


   > @SteNicholas Why not follow Flink's [code contribution 
process](https://flink.apache.org/contributing/contribute-code.html#code-contribution-process)
 to make the assignment in JIRA ticket first? If someone else also choose to 
implement the PR first, it woule make a waste of community resources and might 
bring unnecessary discussion of whose should be reviewed.
   
   @Myasuka , sorry for mistake to contribe. I will comment on this issue for 
discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-24781) Port from string casting logic to CastRule

2021-11-21 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-24781.

Fix Version/s: 1.15.0
   Resolution: Fixed

Fixed in master:

commit 9f7eef293f723800945a9759c50adbf8786a2bd4
[table-planner] Refactor cast of literals to use CastExecutor

commit 0426d8c7af0191f75e6aaa4696b3358de059dc67
[table-planner] Add string parsing methods to BinaryStringDataUtil and add from 
string cast rules

commit 92c02fc747f7794f2c20ac161ad5d7b9c0f2c0f8
[table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps 
the cast invocation in a try-catch

> Port from string casting logic to CastRule
> --
>
> Key: FLINK-24781
> URL: https://issues.apache.org/jira/browse/FLINK-24781
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] twalthr closed pull request #17800: [FLINK-24781] Add to string casting rules

2021-11-21 Thread GitBox


twalthr closed pull request #17800:
URL: https://github.com/apache/flink/pull/17800


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #17699: [FLINK-20370][table] Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-11-21 Thread GitBox


JingsongLi commented on a change in pull request #17699:
URL: https://github.com/apache/flink/pull/17699#discussion_r753993288



##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
   }
 }
+
+private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+  val childModifyKindSet = getModifyKindSet(sink.getInput)
+  val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+  val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+  val sinkTrait = UpdateKindTrait.fromChangelogMode(
+sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+  val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+// if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+// to beforeAndAfter mode for the correctness
+var shouldFallback: Boolean = false
+val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema

Review comment:
   Maybe you just want `getPrimaryKeyIndexes`?

##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
   }
 }
+
+private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+  val childModifyKindSet = getModifyKindSet(sink.getInput)
+  val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+  val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+  val sinkTrait = UpdateKindTrait.fromChangelogMode(
+sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+  val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+// if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+// to beforeAndAfter mode for the correctness
+var shouldFallback: Boolean = false
+val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+if (sinkDefinedPks.nonEmpty) {
+  val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+  val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+  val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+  val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+  // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+  // fallback to beforeAndAfter.
+  // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+  // this differs from batch job's unique key inference
+  if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
+  || !changeLogUpsertKeys.exists {0 == _.compareTo(sinkPks)}) {

Review comment:
   `compareTo`  -> `equals`?

##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
   }
 }
+
+private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+  val childModifyKindSet = getModifyKindSet(sink.getInput)
+  val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+  val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+  val sinkTrait = UpdateKindTrait.fromChangelogMode(
+sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+  val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+// if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+// to beforeAndAfter mode for the correctness
+var shouldFallback: Boolean = false
+val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+if (sinkDefinedPks.nonEmpty) {
+  val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+  val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+  val fmq = 

[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17822:
URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959


   
   ## CI report:
   
   * d3df986a75e34e1ed475b2e1236b7770698e7bd1 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26740)
 
   * 3719c0402ec979c619371fcde9f2e7d2c46d69ed Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26805)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-24608) Sinks built with the unified sink framework do not receive timestamps when used in Table API

2021-11-21 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-24608.

Fix Version/s: 1.15.0
   1.14.1
 Release Note: This adds an additional operator to the topology if the new 
sink interfaces are used (e.g. for Kafka). It could cause issues in 1.14.1 when 
restoring from a 1.14 savepoint. A workaround is to cast the time attribute to 
a regular timestamp in the SQL statement closely before the sink.
   Resolution: Fixed

Fixed in master: 548b96e9cb226aaf8d919d900c9326520a5b6dc8
Fixed in 1.14.1: 3719c0402ec979c619371fcde9f2e7d2c46d69ed

For now I did not back port this fix to 1.13. 1.14 is still not adopted by many 
users whereas 1.13 is. The change could affect the savepoint restoration.

> Sinks built with the unified sink framework do not receive timestamps when 
> used in Table API
> 
>
> Key: FLINK-24608
> URL: https://issues.apache.org/jira/browse/FLINK-24608
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common, Table SQL / Planner
>Affects Versions: 1.14.0, 1.13.3, 1.15.0
>Reporter: Fabian Paul
>Assignee: Marios Trivyzas
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> All sinks built with the unified sink framework extract the timestamp from 
> the internal {{StreamRecord}}. The Table API does not facilitate the 
> timestamp field in the {{StreamRecord}}  but extracts the timestamp from the 
> actual data. 
> We either have to use a dedicated operator before all the sinks to simulate 
> the behavior or allow a customizable timestamp extraction during the sink 
> translation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17822:
URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959


   
   ## CI report:
   
   * d3df986a75e34e1ed475b2e1236b7770698e7bd1 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26740)
 
   * 3719c0402ec979c619371fcde9f2e7d2c46d69ed UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #17699: [FLINK-20370][table] Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-11-21 Thread GitBox


JingsongLi commented on a change in pull request #17699:
URL: https://github.com/apache/flink/pull/17699#discussion_r753992140



##
File path: docs/layouts/shortcodes/generated/execution_config_configuration.html
##
@@ -58,6 +58,12 @@
 Enum
 The NOT NULL column constraint on a table enforces that null 
values can't be inserted into the table. Flink supports 'error' (default) and 
'drop' enforcement behavior. By default, Flink will check values and throw 
runtime exception when null values writing into NOT NULL columns. Users can 
change the behavior to 'drop' to silently drop such records without throwing 
exception.Possible 
values:"ERROR""DROP"
 
+
+table.exec.sink.pk-shuffle Streaming

Review comment:
   Can we create separate PR for this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Airblader commented on a change in pull request #17847: [FLINK-6573][Connectors/Common ] Flink MongoDB Connector

2021-11-21 Thread GitBox


Airblader commented on a change in pull request #17847:
URL: https://github.com/apache/flink/pull/17847#discussion_r753989519



##
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##
@@ -0,0 +1,133 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongodbDynamicTableSourceSinkFactory.class);
+@VisibleForTesting
+public static final String IDENTIFIER = "mongodb";
+public static final ConfigOption DATABASE = 
ConfigOptions.key("database".toLowerCase())
+.stringType()
+.noDefaultValue()
+.withDescription("The data base to connect.");
+public static final ConfigOption URI = 
ConfigOptions.key("uri".toLowerCase())
+.stringType()
+.noDefaultValue()
+.withDescription("The uri to connect.");
+public static final ConfigOption COLLECTION_NAME = ConfigOptions
+.key("collection".toLowerCase())
+.stringType()
+.noDefaultValue()
+.withDescription("The name of the collection to return.");
+public static final ConfigOption MAX_CONNECTION_IDLE_TIME = 
ConfigOptions
+.key("maxConnectionIdleTime".toLowerCase())
+.intType()
+.defaultValue(Integer.valueOf(6))
+.withDescription("The maximum idle time for a pooled connection.");
+public static final ConfigOption BATCH_SIZE = ConfigOptions
+.key("batchSize".toLowerCase())
+.intType()
+.defaultValue(Integer.valueOf(1024))
+.withDescription("The batch size when table invoking.");
+
+public static final ConfigOption FORMAT =
+ConfigOptions.key("format")
+.stringType()
+.defaultValue("json")
+.withDescription(
+"Defines the format identifier for encoding data. "
++ "The identifier is used to discover a 
suitable format factory.");
+
+@Override
+public DynamicTableSink createDynamicTableSink(Context context) {
+ContextUtil.transformContext(this, context);
+FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+helper.validate();
+
+MongodbSinkConf mongodbSinkConf = new MongodbSinkConf(
+(String) helper
+.getOptions()
+.get(DATABASE),
+(String) helper.getOptions().get(COLLECTION_NAME),
+(String) helper.getOptions().get(URI),
+((Integer) 
helper.getOptions().get(MAX_CONNECTION_IDLE_TIME)).intValue(),
+((Integer) helper.getOptions().get(BATCH_SIZE)).intValue());
+
+TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context
+.getCatalogTable()
+.getSchema());
+LOG.info("Create dynamic mongoDB table table: {}.", mongodbSinkConf);
+return new MongodbDynamicTableSink(mongodbSinkConf, physicalSchema);
+}
+
+@Override
+public String factoryIdentifier() {
+return IDENTIFIER;
+}
+
+@Override
+public Set> requiredOptions() {
+Set> requiredOptions = new HashSet();
+requiredOptions.add(DATABASE);
+requiredOptions.add(COLLECTION_NAME);
+requiredOptions.add(URI);
+return requiredOptions;
+}
+
+@Override
+public Set> optionalOptions() {
+Set> optionals = new HashSet();
+

[GitHub] [flink] Airblader commented on a change in pull request #17847: [FLINK-6573][Connectors/Common ] Flink MongoDB Connector

2021-11-21 Thread GitBox


Airblader commented on a change in pull request #17847:
URL: https://github.com/apache/flink/pull/17847#discussion_r753988501



##
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##
@@ -0,0 +1,133 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {

Review comment:
   Please add `@Internal` here

##
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##
@@ -0,0 +1,133 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongodbDynamicTableSourceSinkFactory.class);
+@VisibleForTesting
+public static final String IDENTIFIER = "mongodb";
+public static final ConfigOption DATABASE = 
ConfigOptions.key("database".toLowerCase())

Review comment:
   Please move the config options and the factory identifier to a 
`*ConnectorOptions` class (refer to e.g. `KafkaConnectorOptions`) and mark that 
class `@PublicEvolving`. Also, that class and the entire connector should live 
in a `o.a.f.connector.mongodb` package for consistency, and the class with the 
options in `o.a.f.connector.mongodb.table`.

##
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##
@@ -0,0 +1,133 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import 

[GitHub] [flink-ml] zhipeng93 edited a comment on pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression

2021-11-21 Thread GitBox


zhipeng93 edited a comment on pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#issuecomment-975178654






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] zhipeng93 commented on pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression

2021-11-21 Thread GitBox


zhipeng93 commented on pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#issuecomment-975178654


   Hi @yunfengzhou-hub, thanks for the review. This PR is updated according to 
your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17805: [FLINK-24708][planner] Fix wrong results of the IN operator

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17805:
URL: https://github.com/apache/flink/pull/17805#issuecomment-969943622


   
   ## CI report:
   
   * 067f050f59f186f274c40bc349d74aadd34a23bd Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26585)
 
   * dad62c65d43bf24fc8bf4152177e0c75f0bb4aec Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26803)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17805: [FLINK-24708][planner] Fix wrong results of the IN operator

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17805:
URL: https://github.com/apache/flink/pull/17805#issuecomment-969943622


   
   ## CI report:
   
   * 067f050f59f186f274c40bc349d74aadd34a23bd Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26585)
 
   * dad62c65d43bf24fc8bf4152177e0c75f0bb4aec UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-24002) Support count window with the window TVF

2021-11-21 Thread Yuepeng Pan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17440889#comment-17440889
 ] 

Yuepeng Pan edited comment on FLINK-24002 at 11/22/21, 6:36 AM:


Hi, [~qingru zhang]  [~MartijnVisser] . Could you please assign this ticket to 
me ? Thx.


was (Author: rocmarshal):
Hi, [~qingru zhang]  [~twalthr] . Could you please assign this ticket to me ? 
Thx.

> Support count window with the window TVF
> 
>
> Key: FLINK-24002
> URL: https://issues.apache.org/jira/browse/FLINK-24002
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jing Zhang
>Priority: Minor
>
> For a long time, count window is supported in Table API, but not supported in 
> SQL.
> With the new window TVF syntax, we can also introduce a new window function 
> for count window.
> For example, the following TUMBLE_ROW assigns windows in 10 row-count 
> interval. 
> {panel}
> {panel}
> |{{SELECT}} {{*}}
> {{FROM}} {{TABLE}}{{(}}
> {{   }}{{TUMBLE_ROW(}}
> {{ }}{{data => }}{{TABLE}} {{inputTable,}}
> {{ }}{{timecol => DESCRIPTOR(timecol),}}
> {{ }}{{size}} {{=> 10));}}|
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression

2021-11-21 Thread GitBox


zhipeng93 commented on a change in pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#discussion_r753980720



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticRegressionModel.java
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.linear;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import 
org.apache.flink.ml.classification.linear.LogisticRegressionModelData.LogisticRegressionModelDataEncoder;
+import 
org.apache.flink.ml.classification.linear.LogisticRegressionModelData.LogisticRegressionModelDataStreamFormat;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** This class implements {@link Model} for {@link LogisticRegression}. */
+public class LogisticRegressionModel
+implements Model,
+LogisticRegressionModelParams {
+
+private Map, Object> paramMap;
+
+private Table model;
+
+public LogisticRegressionModel(Map, Object> paramMap) {
+this.paramMap = paramMap;
+ParamUtils.initializeMapWithDefaultValues(this.paramMap, this);
+}
+
+public LogisticRegressionModel() {
+this(new HashMap<>());

Review comment:
   It is called in `LogisticRegressionModel(Map xxx)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17690: [FLINK-24490][docs] The sample code is wrong in Apache Kafka Connector page

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17690:
URL: https://github.com/apache/flink/pull/17690#issuecomment-961660965


   
   ## CI report:
   
   * adaacc288130da0e5ab487b1c669a3a730095eb5 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26001)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-24975) Add hooks and extension points to FlinkSQL

2021-11-21 Thread junbiao chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447203#comment-17447203
 ] 

junbiao chen edited comment on FLINK-24975 at 11/22/21, 6:06 AM:
-

hi [~wenlong.lwl] ,I have developed some optimization rules for Spark SQL,which 
can be also applied to flink.These rules  can improve the performance of batch 
job


was (Author: dahaishuantuoba):
[~wenlong.lwl] I have developed some optimization rules for Spark SQL,which can 
be also applied to flink.These rules  can improve the performance of batch job

> Add hooks and extension points to FlinkSQL
> --
>
> Key: FLINK-24975
> URL: https://issues.apache.org/jira/browse/FLINK-24975
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: junbiao chen
>Priority: Major
>
> refer to sparkSQL,https://issues.apache.org/jira/browse/SPARK-18127



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24975) Add hooks and extension points to FlinkSQL

2021-11-21 Thread junbiao chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447203#comment-17447203
 ] 

junbiao chen commented on FLINK-24975:
--

[~wenlong.lwl] I have developed some optimization rules for Spark SQL,which can 
be also applied to flink.These rules  can improve the performance of batch job

> Add hooks and extension points to FlinkSQL
> --
>
> Key: FLINK-24975
> URL: https://issues.apache.org/jira/browse/FLINK-24975
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: junbiao chen
>Priority: Major
>
> refer to sparkSQL,https://issues.apache.org/jira/browse/SPARK-18127



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17690: [FLINK-24490][docs] The sample code is wrong in Apache Kafka Connector page

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17690:
URL: https://github.com/apache/flink/pull/17690#issuecomment-961660965


   
   ## CI report:
   
   * adaacc288130da0e5ab487b1c669a3a730095eb5 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26001)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17690: [FLINK-24490][docs] The sample code is wrong in Apache Kafka Connector page

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17690:
URL: https://github.com/apache/flink/pull/17690#issuecomment-961660965


   
   ## CI report:
   
   * adaacc288130da0e5ab487b1c669a3a730095eb5 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26001)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JasonLeeCoding commented on pull request #17690: [FLINK-24490][docs] The sample code is wrong in Apache Kafka Connector page

2021-11-21 Thread GitBox


JasonLeeCoding commented on pull request #17690:
URL: https://github.com/apache/flink/pull/17690#issuecomment-975149201


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 96a6ba2a83ca22091099ef36201afcbd8e719243 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26794)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] gaurav726 commented on a change in pull request #17754: [FLINK-24861][connector][jdbc] Fix false cache lookup for empty data

2021-11-21 Thread GitBox


gaurav726 commented on a change in pull request #17754:
URL: https://github.com/apache/flink/pull/17754#discussion_r753941893



##
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
##
@@ -146,10 +148,12 @@ public void eval(Object... keys) {
 if (cache != null) {
 List cachedRows = cache.getIfPresent(keyRow);
 if (cachedRows != null) {
-for (RowData cachedRow : cachedRows) {
-collect(cachedRow);
+if (!cachedRows.isEmpty() || !excludeEmptyQueryResult) {
+for (RowData cachedRow : cachedRows) {

Review comment:
   @wuchong can you please review my PR.
   cc @wenlong88 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] gaurav726 commented on a change in pull request #17754: [FLINK-24861][connector][jdbc] Fix false cache lookup for empty data

2021-11-21 Thread GitBox


gaurav726 commented on a change in pull request #17754:
URL: https://github.com/apache/flink/pull/17754#discussion_r753941893



##
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
##
@@ -146,10 +148,12 @@ public void eval(Object... keys) {
 if (cache != null) {
 List cachedRows = cache.getIfPresent(keyRow);
 if (cachedRows != null) {
-for (RowData cachedRow : cachedRows) {
-collect(cachedRow);
+if (!cachedRows.isEmpty() || !excludeEmptyQueryResult) {
+for (RowData cachedRow : cachedRows) {

Review comment:
   @wuchong can you please review my PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-21 Thread GitBox


yunfengzhou-hub commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r753910367



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+/** Knn classification model fitted by estimator. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+private static final String BROADCAST_STR = "broadcastModelKey";
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+private Table[] modelData;
+
+/** constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public KnnModel(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * Set model data for knn prediction.
+ *
+ * @param modelData knn model.
+ * @return knn classification model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * get model data.
+ *
+ * @return list of tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * @param inputs a list of tables.
+ * @return result.
+ */
+@Override
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+
+Map> broadcastMap = new HashMap<>(1);
+broadcastMap.put(BROADCAST_STR, model);
+ResolvedSchema modelSchema = modelData[0].getResolvedSchema();
+DataType idType =
+
modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1);
+
+ResolvedSchema outputSchema =
+getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), 
idType);
+
+DataType[] dataTypes = 

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-21 Thread GitBox


yunfengzhou-hub commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r753905182



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/EuclideanDistance.java
##
@@ -0,0 +1,272 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.flink.ml.classification.knn.KnnUtil.appendVectorToMatrix;
+
+/**
+ * Euclidean distance is the "ordinary" straight-line distance between two 
points in Euclidean
+ * space.
+ *
+ * https://en.wikipedia.org/wiki/Euclidean_distance
+ *
+ * Given two vectors a and b, Euclidean Distance = ||a - b||, where ||*|| 
means the L2 norm of
+ * the vector.
+ */
+public class EuclideanDistance implements Serializable {

Review comment:
   If it's just about adding some optimizations, it might also be suitable 
for kmeans. I am still not sure that we should have separate euclidean distance 
for knn and kmeans.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression

2021-11-21 Thread GitBox


yunfengzhou-hub commented on a change in pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#discussion_r753928296



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasBatchSize.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared batchSize param. */
+public interface HasBatchSize extends WithParams {
+
+Param BATCH_SIZE =
+new IntParam(
+"batchSize",
+"batch size of training algorithms. The default value is 
100.",

Review comment:
   Let's make descriptions start with Uppercase letter and ends with full 
stop. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression

2021-11-21 Thread GitBox


yunfengzhou-hub commented on a change in pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#discussion_r753930171



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasL2.java
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared L2 regularization param. */
+public interface HasL2 extends WithParams {
+Param L_2 =
+new DoubleParam(
+"l2",
+"The L2-regularized parameter. The default value is 0.",

Review comment:
   We might need to establish a unified practice of whether to describe the 
default value (or accepted values in enum params). We can discuss it offline.

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticGradient.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.linear;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.ml.common.linalg.BLAS;
+
+import java.io.Serializable;
+
+/** Logistic gradient. */

Review comment:
   Shall we have a more detailed description for this class? It might be 
difficult to infer the functionality of this class by reading this javadoc.

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasBatchSize.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared batchSize param. */
+public interface HasBatchSize extends WithParams {
+
+Param BATCH_SIZE =
+new IntParam(
+"batchSize",
+"batch size of training algorithms. The default value is 
100.",

Review comment:
   A better style would be make descriptions start with Uppercase letter 
and ends with full stop. 

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticRegressionModel.java
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the 

[GitHub] [flink] dailai commented on pull request #17847: [FLINK-6573][Connectors/Common ] Flink MongoDB Connector

2021-11-21 Thread GitBox


dailai commented on pull request #17847:
URL: https://github.com/apache/flink/pull/17847#issuecomment-975096035


   不支持upsert么


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24975) Add hooks and extension points to FlinkSQL

2021-11-21 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447194#comment-17447194
 ] 

Wenlong Lyu commented on FLINK-24975:
-

hi, [~dahaishuantuoba], it is an interesting idea. Do you have any actual use 
cases depending on this feature?

> Add hooks and extension points to FlinkSQL
> --
>
> Key: FLINK-24975
> URL: https://issues.apache.org/jira/browse/FLINK-24975
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: junbiao chen
>Priority: Major
>
> refer to sparkSQL,https://issues.apache.org/jira/browse/SPARK-18127



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24949) KafkaITCase.testBigRecordJob fails on azure

2021-11-21 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447195#comment-17447195
 ] 

Jiangjie Qin commented on FLINK-24949:
--

[~gaoyunhaii] not sure if this is the same cause. [~renqs] can you take a look?

> KafkaITCase.testBigRecordJob fails on azure
> ---
>
> Key: FLINK-24949
> URL: https://issues.apache.org/jira/browse/FLINK-24949
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.1
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
> Fix For: 1.14.1
>
>
> {code:java}
> Nov 17 23:39:39 [ERROR] Tests run: 23, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 222.57 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Nov 17 23:39:39 [ERROR] testBigRecordJob  Time elapsed: 60.02 s  <<< ERROR!
> Nov 17 23:39:39 org.junit.runners.model.TestTimedOutException: test timed out 
> after 6 milliseconds
> Nov 17 23:39:39   at sun.misc.Unsafe.park(Native Method)
> Nov 17 23:39:39   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> Nov 17 23:39:39   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> Nov 17 23:39:39   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> Nov 17 23:39:39   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> Nov 17 23:39:39   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> Nov 17 23:39:39   at 
> org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:58)
> Nov 17 23:39:39   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBigRecordTestTopology(KafkaConsumerTestBase.java:1473)
> Nov 17 23:39:39   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testBigRecordJob(KafkaITCase.java:119)
> Nov 17 23:39:39   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 17 23:39:39   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 17 23:39:39   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 17 23:39:39   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 17 23:39:39   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 17 23:39:39   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 17 23:39:39   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Nov 17 23:39:39   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Nov 17 23:39:39   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Nov 17 23:39:39   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Nov 17 23:39:39   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Nov 17 23:39:39   at java.lang.Thread.run(Thread.java:748)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26679=logs=c5f0071e-1851-543e-9a45-9ac140befc32=15a22db7-8faa-5b34-3920-d33c9f0ca23c=7161
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24918) Support to specify the data dir for state benchmark

2021-11-21 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447188#comment-17447188
 ] 

Yun Tang commented on FLINK-24918:
--

Merged in flink-master: 726eead617bb3392df6a6ef1681de62c631bbeb5

> Support to specify the data dir for state benchmark 
> 
>
> Key: FLINK-24918
> URL: https://issues.apache.org/jira/browse/FLINK-24918
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks, Runtime / State Backends
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Minor
>  Labels: pull-request-available
>
> {{StateBackendBenchmarkUtils}} use null as the parent dir to create temp dir, 
> which will finally use the /tmp as the data dir. It has two downsides:
> 1. the /tmp dir often mount with tmpfs, which may store data in memory. it 
> will affect the result of rocksdb benchmark
> 2. It can not support to use benchmark to measure the performance on a new 
> device. 
> So I purpose to enhance the state benchmark to support specify the default 
> data dir. And avoiding to use the {{/tmp}} as default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] Myasuka closed pull request #17846: [FLINK-24918][Runtime/State Backends]Support to specify the data dir …

2021-11-21 Thread GitBox


Myasuka closed pull request #17846:
URL: https://github.com/apache/flink/pull/17846


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-21 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447187#comment-17447187
 ] 

Yun Tang commented on FLINK-24926:
--

[~liuhb86] Can you repeat this error every time?
I use test code below to verify but not ever come across your error message for 
minutes.

{code:java}
  @Test
  def testJoinStream(): Unit = {
val settings = EnvironmentSettings
  .newInstance()
  .inStreamingMode()
  .build();
val tableEnv = TableEnvironment.create(settings)
val configuration = tableEnv.getConfig.getConfiguration
configuration.setString("table.exec.resource.default-parallelism", "2");

val testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
  .schema(Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
.column("v", DataTypes.INT())
.watermark("ts", "ts - INTERVAL '1' second")
.build())
  .option("rows-per-second", "2")
  .option("fields.v.kind", "sequence")
  .option("fields.v.start", "0")
  .option("fields.v.end", "100")
  .build())
testTable.printSchema();
tableEnv.createTemporaryView("test", testTable );

val joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
  " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");

  val tableResult = 
joined.executeInsert(TableDescriptor.forConnector("print").build());
  tableResult.await()
  }
{code}
 

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> 

[GitHub] [flink] flinkbot edited a comment on pull request #17736: [FLINK-24694][docs-zh] Translate "Checkpointing under backpressure" page into Chinese

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17736:
URL: https://github.com/apache/flink/pull/17736#issuecomment-964148175


   
   ## CI report:
   
   * 24962d92cd64eb08c3c404fcb9aff1b54be54eab Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26801)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24967) Make the IO pattern configureable in state benchmarks

2021-11-21 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447186#comment-17447186
 ] 

Yun Tang commented on FLINK-24967:
--

I think introducing various size of state related operations should be helpful. 
Current configuration of IO size just balance the end-to-end benchmark running 
duration and the abililty to verify different state-backends performance.

> Make the IO pattern configureable in state benchmarks
> -
>
> Key: FLINK-24967
> URL: https://issues.apache.org/jira/browse/FLINK-24967
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks, Runtime / State Backends
>Reporter: Aitozi
>Priority: Minor
>
> Currently, state benchmarks IO size are controlled by 
> {{StateBenchmarkConstants}}, which are not flexible to change. It's not easy 
> to test the performance under different IO size/pattern and different disk 
> (which can be solved by 
> [FLINK-24918|https://issues.apache.org/jira/browse/FLINK-24918]). I purpose 
> to make the state benchmark more configurable .



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24975) Add hooks and extension points to FlinkSQL

2021-11-21 Thread junbiao chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

junbiao chen updated FLINK-24975:
-
Component/s: Table SQL / Planner

> Add hooks and extension points to FlinkSQL
> --
>
> Key: FLINK-24975
> URL: https://issues.apache.org/jira/browse/FLINK-24975
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: junbiao chen
>Priority: Major
>
> refer to sparkSQL,https://issues.apache.org/jira/browse/SPARK-18127



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24975) Add hooks and extension points to FlinkSQL

2021-11-21 Thread junbiao chen (Jira)
junbiao chen created FLINK-24975:


 Summary: Add hooks and extension points to FlinkSQL
 Key: FLINK-24975
 URL: https://issues.apache.org/jira/browse/FLINK-24975
 Project: Flink
  Issue Type: New Feature
Reporter: junbiao chen


refer to sparkSQL,https://issues.apache.org/jira/browse/SPARK-18127



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24947) Flink on k8s support HostNetWork model

2021-11-21 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447179#comment-17447179
 ] 

Yang Wang commented on FLINK-24947:
---

Thanks for your detailed comments.

I am curious that whether we could still create the internal/rest services on 
the client side and then update the {{targetPort}} after the JM has allocated a 
dynamic port. Then it might be easier to let the client could connect to 
JobManager via service.

> Flink on k8s support HostNetWork model
> --
>
> Key: FLINK-24947
> URL: https://issues.apache.org/jira/browse/FLINK-24947
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: liuzhuo
>Priority: Minor
>
> For the use of flink on k8s, for performance considerations, it is important 
> to choose a CNI plug-in. Usually we have two environments: Managed and 
> UnManaged.
>   Managed: Cloud vendors usually provide very efficient CNI plug-ins, we 
> don’t need to care about network performance issues
>   UnManaged: On self-built K8s clusters, CNI plug-ins are usually optional, 
> similar to Flannel and Calcico, but such software network cards usually lose 
> some performance or require some additional network strategies.
> For an unmanaged environment, if we also want to achieve the best network 
> performance, should we support the *HostNetWork* model?
> Use the host network to achieve the best performance



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24947) Flink on k8s support HostNetWork model

2021-11-21 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-24947:
--
Fix Version/s: 1.15.0

> Flink on k8s support HostNetWork model
> --
>
> Key: FLINK-24947
> URL: https://issues.apache.org/jira/browse/FLINK-24947
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: liuzhuo
>Priority: Major
> Fix For: 1.15.0
>
>
> For the use of flink on k8s, for performance considerations, it is important 
> to choose a CNI plug-in. Usually we have two environments: Managed and 
> UnManaged.
>   Managed: Cloud vendors usually provide very efficient CNI plug-ins, we 
> don’t need to care about network performance issues
>   UnManaged: On self-built K8s clusters, CNI plug-ins are usually optional, 
> similar to Flannel and Calcico, but such software network cards usually lose 
> some performance or require some additional network strategies.
> For an unmanaged environment, if we also want to achieve the best network 
> performance, should we support the *HostNetWork* model?
> Use the host network to achieve the best performance



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24947) Flink on k8s support HostNetWork model

2021-11-21 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-24947:
--
Priority: Major  (was: Minor)

> Flink on k8s support HostNetWork model
> --
>
> Key: FLINK-24947
> URL: https://issues.apache.org/jira/browse/FLINK-24947
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: liuzhuo
>Priority: Major
>
> For the use of flink on k8s, for performance considerations, it is important 
> to choose a CNI plug-in. Usually we have two environments: Managed and 
> UnManaged.
>   Managed: Cloud vendors usually provide very efficient CNI plug-ins, we 
> don’t need to care about network performance issues
>   UnManaged: On self-built K8s clusters, CNI plug-ins are usually optional, 
> similar to Flannel and Calcico, but such software network cards usually lose 
> some performance or require some additional network strategies.
> For an unmanaged environment, if we also want to achieve the best network 
> performance, should we support the *HostNetWork* model?
> Use the host network to achieve the best performance



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] Myasuka commented on pull request #17833: [FLINK-24785][runtime] Relocate RocksDB's log under flink log directory by default

2021-11-21 Thread GitBox


Myasuka commented on pull request #17833:
URL: https://github.com/apache/flink/pull/17833#issuecomment-975029611


   @SteNicholas Why not follow Flink's [code contribution 
process](https://flink.apache.org/contributing/contribute-code.html#code-contribution-process)
 to make the assignment in JIRA ticket first? If someone else also choose to 
implement the PR first, it woule make a waste of community resources and might 
bring unnecessary discussion of whose should be reviewed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24961) When the DDL statement is different from the actual schema in the database, ArrayIndexOutOfBoundsException will be reported

2021-11-21 Thread Fangliang Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447177#comment-17447177
 ] 

Fangliang Liu commented on FLINK-24961:
---

Hi  [~jark] [~lzljs3620320], Take a look, thanks.

> When the DDL statement is different from the actual schema in the database, 
> ArrayIndexOutOfBoundsException will be reported 
> 
>
> Key: FLINK-24961
> URL: https://issues.apache.org/jira/browse/FLINK-24961
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.13.2
>Reporter: Fangliang Liu
>Priority: Major
>
> DDL
> {code:java}
> CREATE TABLE if not exists table_a (
>`user_id` BIGINT NULL COMMENT '',
>`id` BIGINT NULL COMMENT '',
>`position_id` BIGINT NULL COMMENT '',
>`status` STRING NULL COMMENT '',
>`transaction_id` BIGINT NULL COMMENT '',
> PRIMARY KEY (`user_id`, `id`) NOT ENFORCED
> ) WITH(
>   'connector'='kafka',
>   'topic'='',
>   'properties.bootstrap.servers'='xxx',
>   'properties.group.id'='xxx',
>   'properties.auto.offset.reset'='earliest',
>   'scan.startup.mode'='earliest-offset',
>   'format'='debezium-avro-confluent',
>   'debezium-avro-confluent.schema-registry.url'=''
>   );
> CREATE TABLE if not exists table_b (
>  `user_id` BIGINT NULL COMMENT '',
>  `id` BIGINT NULL COMMENT '',
>  `position_id` BIGINT NULL COMMENT '',
>  `status` STRING NULL COMMENT '',
>  `transaction_id` BIGINT NULL COMMENT '',
> ) WITH (
>   'connector' = 'tidb',
>   'tidb.database.url' = 'jdbc:mysql://',
>   'tidb.username' = '',
>   'tidb.password' = 'x',
>   'tidb.database.name' = 'x',
>   'tidb.maximum.pool.size' = '1',
>   'tidb.minimum.idle.size' = '1',
>   'tidb.table.name' = 'withdraws',
>   'tidb.write_mode' = 'upsert',
>   'sink.buffer-flush.max-rows' = '0'
>   );
> insert into table_b select * from table_a;
> {code}
> The actual schema in tidb has one more auto-increment column than table_b, 
> and the following error is reported when the task is started
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createBufferReduceExecutor$1(JdbcDynamicOutputFormatBuilder.java:145)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>   at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
> ~[?:1.8.0_291]
>   at 
> java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
>  ~[?:1.8.0_291]
>   at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
> ~[?:1.8.0_291]
>   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_291]
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_291]
>   at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
> ~[?:1.8.0_291]
>   at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>  ~[?:1.8.0_291]
>   at 
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
> ~[?:1.8.0_291]
>   at 
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.createBufferReduceExecutor(JdbcDynamicOutputFormatBuilder.java:145)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$build$edc08011$1(JdbcDynamicOutputFormatBuilder.java:106)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:142)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:116)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>  ~[flink-tidb-connector-1.13-0.0.4.jar:?]
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
>  ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
>   at 
> 

[jira] [Commented] (FLINK-24953) Optime hive parallelism inference

2021-11-21 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447174#comment-17447174
 ] 

Jingsong Lee commented on FLINK-24953:
--

cc: [~jark] 

> Optime hive parallelism inference
> -
>
> Key: FLINK-24953
> URL: https://issues.apache.org/jira/browse/FLINK-24953
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.13.0, 1.14.0
>Reporter: xiangqiao
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when I disable hive table source parallelism inference using 
> configuration  and set parallelism.default: 100. 
> {code:java}
> table.exec.hive.infer-source-parallelism: false 
> parallelism.default: 100{code}
> The result is that the parallelism of hive table source is {*}1{*}, and the 
> configuration of the default parallelism is not effective.
> I will optimize this problem. In the future, when disable hive table source 
> parallelism inference ,the  parallelism of hive table source will be 
> determined according to the following order:
>  
> 1. If table.exec.resource.default-parallelism is set, the configured value 
> will be used
> 2. If parallelism.default is set, the configured value is used
> 3. If the above two configuration items are not set, the default value is 1
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24758) filesystem sink: partition.time-extractor.kind support "yyyyMMdd"

2021-11-21 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447175#comment-17447175
 ] 

Jingsong Lee commented on FLINK-24758:
--

CC: [~jark] 

> filesystem sink: partition.time-extractor.kind support "MMdd"
> -
>
> Key: FLINK-24758
> URL: https://issues.apache.org/jira/browse/FLINK-24758
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Jingsong Lee
>Assignee: liwei li
>Priority: Major
>  Labels: pull-request-available
>
> Now, only supports -mm-dd hh:mm:ss, we can add a new time-extractor kind 
> to support MMdd in a single partition field
> .



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-21 Thread GitBox


yunfengzhou-hub commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r753896511



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnParams.java
##
@@ -0,0 +1,32 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.ml.common.param.HasFeatureColsDefaultAsNull;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasPredictionCol;
+import org.apache.flink.ml.common.param.HasVectorColDefaultAsNull;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** knn fit parameters. */
+public interface KnnParams
+extends WithParams,
+HasVectorColDefaultAsNull,
+HasLabelCol,
+HasFeatureColsDefaultAsNull,
+HasPredictionCol {
+/**
+ * @cn-name topK
+ * @cn topK
+ */
+Param K = new IntParam("k", "k", 10, ParamValidators.gt(0));
+
+default Integer getK() {

Review comment:
   Just noticed that this `getK()` method is only used in tests, not in 
`Knn` algorithm. Is there any problem with the algorithm's implementation?

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasFeatureColsDefaultAsNull.java
##
@@ -0,0 +1,28 @@
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Params of the names of the feature columns used for training in the input 
table. */
+public interface HasFeatureColsDefaultAsNull extends WithParams {
+/**
+ * @cn-name 特征列名数组
+ * @cn 特征列名数组,默认全选
+ */
+Param FEATURE_COLS =

Review comment:
   Can we reuse classes like `StringArrayParam`? Same for other classes.

##
File path: flink-ml-api/src/main/java/org/apache/flink/ml/linalg/Vector.java
##
@@ -29,6 +29,10 @@
 /** Gets the value of the ith element. */
 double get(int i);
 
+
+/** set the value of the ith element. */
+void set(int i, double val);

Review comment:
   Hmm... I still cannot agree with this. I read the usage of 
`Vector.set()` in Knn and I think we can satisfy the need to update a `Vector` 
in knn without adding much memory burden by creating and updating a double 
array before using the array to create a `Vector`. 

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+import 

[jira] [Closed] (FLINK-16937) ParquetTableSource should generate correct isFilterPushedDown

2021-11-21 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-16937.

Resolution: Won't Fix

ParquetTableSource is removed in latest version.

> ParquetTableSource should generate correct isFilterPushedDown
> -
>
> Key: FLINK-16937
> URL: https://issues.apache.org/jira/browse/FLINK-16937
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Jingsong Lee
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
>  
> {code:java}
> if (predicate != null) {
>this.isFilterPushedDown = true;
> }
> {code}
> If all filters can not be converted to parquet filter, the predicate will be 
> null, this will lead to false isFilterPushdedDown, which is wrong.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 96a6ba2a83ca22091099ef36201afcbd8e719243 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26794)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 96a6ba2a83ca22091099ef36201afcbd8e719243 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26794)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] maosuhan commented on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-21 Thread GitBox


maosuhan commented on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-975009706


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17736: [FLINK-24694][docs-zh] Translate "Checkpointing under backpressure" page into Chinese

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17736:
URL: https://github.com/apache/flink/pull/17736#issuecomment-964148175


   
   ## CI report:
   
   * 8171263c979b3bb12394d08700361b3632f74191 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26222)
 
   * 24962d92cd64eb08c3c404fcb9aff1b54be54eab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26801)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17736: [FLINK-24694][docs-zh] Translate "Checkpointing under backpressure" page into Chinese

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17736:
URL: https://github.com/apache/flink/pull/17736#issuecomment-964148175


   
   ## CI report:
   
   * 8171263c979b3bb12394d08700361b3632f74191 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26222)
 
   * 24962d92cd64eb08c3c404fcb9aff1b54be54eab UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-21 Thread GitBox


wangyang0918 commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r753895825



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -256,39 +247,50 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 flinkConfig.get(JobManagerOptions.PORT));
 }
 
+final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+final FlinkPod podTemplate =
+kubernetesJobManagerParameters
+.getPodTemplateFilePath()
+.map(
+file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+client, file, 
Constants.MAIN_CONTAINER_NAME))
+.orElse(new FlinkPod.Builder().build());
+final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+podTemplate, kubernetesJobManagerParameters);
+
+client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+return createClusterClientProvider(clusterId);
+}
+
+private ClusterClientProvider safelyDeployCluster(
+SupplierWithException, Exception> 
supplier)
+throws ClusterDeploymentException {
 try {
-final KubernetesJobManagerParameters 
kubernetesJobManagerParameters =
-new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
-
-final FlinkPod podTemplate =
-kubernetesJobManagerParameters
-.getPodTemplateFilePath()
-.map(
-file ->
-
KubernetesUtils.loadPodFromTemplateFile(
-client, file, 
Constants.MAIN_CONTAINER_NAME))
-.orElse(new FlinkPod.Builder().build());
-final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-podTemplate, kubernetesJobManagerParameters);
-
-client.createJobManagerComponent(kubernetesJobManagerSpec);
-
-return createClusterClientProvider(clusterId);
+
+ClusterClientProvider clusterClientProvider = 
supplier.get();
+
+try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
+LOG.info(
+"Create flink cluster {} successfully, JobManager Web 
Interface: {}",
+clusterId,
+clusterClient.getWebInterfaceURL());
+}
+return clusterClientProvider;
 } catch (Exception e) {
 try {
-LOG.warn(
-"Failed to create the Kubernetes cluster \"{}\", try 
to clean up the residual resources.",
-clusterId);
 client.stopAndCleanupCluster(clusterId);
-} catch (Exception e1) {
-LOG.info(
+} catch (Exception ex) {
+LOG.warn(
 "Failed to stop and clean up the Kubernetes cluster 
\"{}\".",
 clusterId,
-e1);
+ex);
 }
-throw new ClusterDeploymentException(

Review comment:
   Also here, why you remove the exception message here.

##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -155,19 +156,14 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 @Override
 public ClusterClientProvider deploySessionCluster(
 ClusterSpecification clusterSpecification) throws 
ClusterDeploymentException {
-final ClusterClientProvider clusterClientProvider =
-deployClusterInternal(
-KubernetesSessionClusterEntrypoint.class.getName(),
-clusterSpecification,
-false);
-
-try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
-LOG.info(
-"Create flink session cluster {} successfully, JobManager 
Web Interface: {}",
-clusterId,
-clusterClient.getWebInterfaceURL());
-}
-return clusterClientProvider;
+final SupplierWithException, Exception> 
supplier =

Review comment:
   Do we really need to have such local 

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-21 Thread GitBox


yunfengzhou-hub commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r753894135



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,343 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.ml.classification.knn.KnnUtil.extractObject;
+import static org.apache.flink.ml.classification.knn.KnnUtil.findColTypes;
+import static org.apache.flink.ml.classification.knn.KnnUtil.pGson;
+import static 
org.apache.flink.ml.classification.knn.KnnUtil.resolvedSchema2Schema;
+
+/** Knn classification model fitted by KnnClassifier. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+public static final String BROADCAST_STR = "broadcastModelKey";
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+private Table[] modelData;
+
+/** constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public KnnModel(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * Set model data for knn prediction.
+ *
+ * @param modelData knn model.
+ * @return knn classification model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * get model data.
+ *
+ * @return list of tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * @param inputs a list of tables.
+ * @return result.
+ */
+@Override
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+
+Map> broadcastMap = new HashMap<>(1);
+broadcastMap.put(BROADCAST_STR, model);
+ResolvedSchema modelSchema = modelData[0].getResolvedSchema();
+DataType idType =
+
modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1);
+
+ResolvedSchema outputSchema =
+getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), 
idType);
+
+DataType[] dataTypes = outputSchema.getColumnDataTypes().toArray(new 
DataType[0]);
+

[jira] [Closed] (FLINK-24969) pyflink/table/tests/test_pandas_udf.py hangs on azure

2021-11-21 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-24969.
---
Resolution: Duplicate

It seems that it should be caused by the same reason as FLINK-24764. I'm 
closing this ticket and fixing it there.

> pyflink/table/tests/test_pandas_udf.py hangs on azure
> -
>
> Key: FLINK-24969
> URL: https://issues.apache.org/jira/browse/FLINK-24969
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.3
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> Nov 20 05:11:02   at 
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> Nov 20 05:11:02   at 
> java.net.SocketInputStream.read(SocketInputStream.java:171)
> Nov 20 05:11:02   at 
> java.net.SocketInputStream.read(SocketInputStream.java:141)
> Nov 20 05:11:02   at 
> sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
> Nov 20 05:11:02   at 
> sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
> Nov 20 05:11:02   at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
> Nov 20 05:11:02   - locked <0x9379b830> (a 
> java.io.InputStreamReader)
> Nov 20 05:11:02   at 
> java.io.InputStreamReader.read(InputStreamReader.java:184)
> Nov 20 05:11:02   at java.io.BufferedReader.fill(BufferedReader.java:161)
> Nov 20 05:11:02   at 
> java.io.BufferedReader.readLine(BufferedReader.java:324)
> Nov 20 05:11:02   - locked <0x9379b830> (a 
> java.io.InputStreamReader)
> Nov 20 05:11:02   at 
> java.io.BufferedReader.readLine(BufferedReader.java:389)
> Nov 20 05:11:02   at 
> org.apache.flink.api.python.shaded.py4j.CallbackConnection.readBlockingResponse(CallbackConnection.java:169)
> Nov 20 05:11:02   at 
> org.apache.flink.api.python.shaded.py4j.CallbackConnection.sendCommand(CallbackConnection.java:148)
> Nov 20 05:11:02   at 
> org.apache.flink.api.python.shaded.py4j.CallbackClient.sendCommand(CallbackClient.java:384)
> Nov 20 05:11:02   at 
> org.apache.flink.api.python.shaded.py4j.CallbackClient.sendCommand(CallbackClient.java:356)
> Nov 20 05:11:02   at 
> org.apache.flink.api.python.shaded.py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
> Nov 20 05:11:02   at com.sun.proxy.$Proxy19.ping(Unknown Source)
> Nov 20 05:11:02   at 
> org.apache.flink.client.python.PythonGatewayServer.main(PythonGatewayServer.java:94)
> Nov 20 05:11:02 
> Nov 20 05:11:02 "VM Thread" os_prio=0 tid=0x7f7874076000 nid=0x31da0 
> runnable 
> Nov 20 05:11:02 
> Nov 20 05:11:02 "GC task thread#0 (ParallelGC)" os_prio=0 
> tid=0x7f7874021800 nid=0x31d9e runnable 
> Nov 20 05:11:02 
> Nov 20 05:11:02 "GC task thread#1 (ParallelGC)" os_prio=0 
> tid=0x7f7874023000 nid=0x31d9f runnable 
> Nov 20 05:11:02 
> Nov 20 05:11:02 "VM Periodic Task Thread" os_prio=0 tid=0x7f78740c 
> nid=0x31da7 waiting on condition 
> Nov 20 05:11:02 
> Nov 20 05:11:02 JNI global references: 4563
> Nov 20 05:11:02 
> Nov 20 05:11:02 Killing process with pid=636 and all descendants
> /__w/1/s/tools/ci/watchdog.sh: line 113:   636 Terminated  $cmd
> Nov 20 05:11:02 Process exited with EXIT CODE: 143.
> Nov 20 05:11:02 Trying to KILL watchdog (632).
> Nov 20 05:11:02 Searching for .dump, .dumpstream and related files in 
> '/__w/1/s'
> The STDIO streams did not close within 10 seconds of the exit event from 
> process '/bin/bash'. This may indicate a child process inherited the STDIO 
> streams and has not yet exited.
> ##[error]Bash exited with code '143'.
> Finishing: Test - python {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26776=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490=23553



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24803) Fix cast BINARY/VARBINARY to STRING

2021-11-21 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447166#comment-17447166
 ] 

Shen Zhu commented on FLINK-24803:
--

Hi Timo([~twalthr] ), Marios([~matriv]), I'm interested in working on this 
ticket, would you mind assigning it to me? Thanks!

> Fix cast BINARY/VARBINARY to STRING
> ---
>
> Key: FLINK-24803
> URL: https://issues.apache.org/jira/browse/FLINK-24803
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> BINARY/VARBINARY should be printed as regular arrays instead of interpreting 
> them in an arbitrary character set as a string.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14458) Assert implementation classes of catalog objects

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14458:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Assert implementation classes of catalog objects
> 
>
> Key: FLINK-14458
> URL: https://issues.apache.org/jira/browse/FLINK-14458
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Bowen Li
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> We need to assert implementation classes of catalog objects, including 
> table/view/function/partition/stats/db, to make sure they are of the right 
> implementations



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-10346) MemoryStateBackend does not clean up checkpoint directory

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-10346:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> MemoryStateBackend does not clean up checkpoint directory
> -
>
> Key: FLINK-10346
> URL: https://issues.apache.org/jira/browse/FLINK-10346
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> The {{StateBackendLoader}} creates a random subdirectory under 
> {{HighAvailabilityOptions.HA_STORAGE_PATH}} for the {{MemoryStateBackend}} if 
> no checkpointing directory has been specified (see 
> {{StateBackendLoader.java:246}}). The problem is that this directory gets 
> never removed after the {{MemoryStateBackend}} is closed. It is important to 
> clean up these created directories.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-10442) Make declined savepoints visible in WebUI

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-10442:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Make declined savepoints visible in WebUI
> -
>
> Key: FLINK-10442
> URL: https://issues.apache.org/jira/browse/FLINK-10442
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends, Runtime / Web Frontend
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0
>Reporter: Nico Kruber
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> There are some reasons in {{CheckpointCoordinator#triggerCheckpoint()}} for 
> failing a checkpoint/savepoint early in which case it would only show up in 
> the logs (if even) but never in the web UI.
> Now for checkpoints this makes sense since a lot of checkpoints may 
> (currently) be executed before all tasks are actually running.
> However, for (user-triggered) savepoints, you would not get too much clutter 
> into the web UI (or the logs) and you would expect them to show up as well 
> for debugging purposes.
> I propose to change the behaviour accordingly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-16468:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {noformat}
> 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...
> {noformat}
> The above is output repeatedly until the following error occurs:
> {noformat}
> java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
>  at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>  at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>  at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketException: Too many open files
>  at java.net.Socket.createImpl(Socket.java:478)
>  at java.net.Socket.connect(Socket.java:605)
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
>  ... 8 more
> {noformat}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-10305) flink-conf.yaml grows continuously

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-10305:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> flink-conf.yaml grows continuously  
> 
>
> Key: FLINK-10305
> URL: https://issues.apache.org/jira/browse/FLINK-10305
> Project: Flink
>  Issue Type: Bug
>  Components: flink-docker
>Reporter: Dimitrije
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> `query.server.port` & `blob.server.port` variables are continuously appended 
> to the flink-conf.yaml when the job manager restarts.
>  
> Running a jobmanager & taskmanager using docker-compose
> I am using a single `flink-conf.yaml` which is mounted as a volume to a 
> jobmanager and taskmanager container.  Every time the jobmanager restarts, it 
> appends the two variables to the end of the file causing it to grow.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-16506:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> SqlCreateTable can not get the original text when there exists non-ascii char 
> in the column definition
> --
>
> Key: FLINK-16506
> URL: https://issues.apache.org/jira/browse/FLINK-16506
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Terry Wang
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> We can reproduce this problem in FlinkSqlParserImplTest, add one more column 
> definition
> `  x varchar comment 'Flink 社区', \n`
> ```
>   @Test
>   public void testCreateTableWithComment() {
>   conformance0 = FlinkSqlConformance.HIVE;
>   check("CREATE TABLE tbl1 (\n" +
>   "  a bigint comment 'test column comment 
> AAA.',\n" +
>   "  h varchar, \n" +
>   "  x varchar comment 'Flink 社区', \n" +
>   "  g as 2 * (a + 1), \n" +
>   "  ts as toTimestamp(b, '-MM-dd HH:mm:ss'), 
> \n" +
>   "  b varchar,\n" +
>   "  proc as PROCTIME(), \n" +
>   "  PRIMARY KEY (a, b)\n" +
>   ")\n" +
>   "comment 'test table comment ABC.'\n" +
>   "PARTITIONED BY (a, h)\n" +
>   "  with (\n" +
>   "'connector' = 'kafka', \n" +
>   "'kafka.topic' = 'log.test'\n" +
>   ")\n",
>   "CREATE TABLE `TBL1` (\n" +
>   "  `A`  BIGINT  COMMENT 'test column comment 
> AAA.',\n" +
>   "  `H`  VARCHAR,\n" +
>   "  `X` VARCHAR COMMENT 'Flink 社区', \n" +
>   "  `G` AS (2 * (`A` + 1)),\n" +
>   "  `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd 
> HH:mm:ss'),\n" +
>   "  `B`  VARCHAR,\n" +
>   "  `PROC` AS `PROCTIME`(),\n" +
>   "  PRIMARY KEY (`A`, `B`)\n" +
>   ")\n" +
>   "COMMENT 'test table comment ABC.'\n" +
>   "PARTITIONED BY (`A`, `H`)\n" +
>   "WITH (\n" +
>   "  'connector' = 'kafka',\n" +
>   "  'kafka.topic' = 'log.test'\n" +
>   ")");
>   }
> ```
> the actual unparse of x column will be   ` X`  VARCHAR  COMMENT u&'Flink 
> \793e\533a' instead of our expection.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-10382:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> ---
>
> Key: FLINK-10382
> URL: https://issues.apache.org/jira/browse/FLINK-10382
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chengzhi Zhao
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
> try {
> super.close();
> } finally {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> }
> }
> }
> {code}
> I am curious if 

[jira] [Updated] (FLINK-10409) Collection data sink does not propagate exceptions

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-10409:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Collection data sink does not propagate exceptions
> --
>
> Key: FLINK-10409
> URL: https://issues.apache.org/jira/browse/FLINK-10409
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Dawid Wysakowicz
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> I would assume that this test should fail with {{RuntimeException}}, but it 
> actually runs just fine.
> {code}
> @Test
> public void testA() throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   List resultList = new ArrayList<>();
>   SingleOutputStreamOperator result = 
> env.fromElements("A").map(obj -> {
>   throw new RuntimeException();
>   });
>   DataStreamUtils.collect(result).forEachRemaining(resultList::add);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14550) Can't use proctime attribute when register datastream to flink table which uses some fields in the nested fields

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14550:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Can't use proctime attribute when register datastream  to flink table which  
> uses some fields in the nested fields 
> ---
>
> Key: FLINK-14550
> URL: https://issues.apache.org/jira/browse/FLINK-14550
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: hehuiyuan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> *_The data schame :_*
>  
> final String schemaString =
>  "
> {\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\","
>  + "\"fields\": [
> {\"name\":\"name\",\"type\":\"string\"}
> ,
> {\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}
> ," +
>  "
> {\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}
> ,
> {\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}
> " +
>  ",\{\"name\":\"type_double_test\",\"type\":\"double\"},
> {\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]}
> ," +
>  "
> {\"name\":\"type_bool_test\",\"type\":[\"boolean\"]}
> ,{\"name\":\"type_array_string\",\"type\":" +
>  
> "\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\","
>  +
>  
> "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",
> {\"type\":\"array\"," + "\"items\":\"string\"}
> ],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," +
>  
> "\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\","
>  +
>  "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",
> {\"type\":\"fixed\",\"name\":\"Fixed16\"," + "\"size\":16}
> ],\"size\":16},
> {\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]}
> ," +
>  
> *"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[**{\"name\":\"num\","
>  
> +"\"type\":\"int\"}**,\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"},"
>  +*
>  
> *"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},*
> {\"name\":\"type_bytes\"," + "\"type\":\"bytes\"}
> ,\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
>  +
>  
> "\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\","
>  +
>  
> "\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
>  +
>  
> "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
>  +
>  
> "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\","
>  +
>  
> "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\","
>  +
>  
> "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";
>  
> *_The code :_*
> tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime");
>  
> _*The error is as follows:*_
> Exception in thread "main" org.apache.flink.table.api.TableException: The 
> proctime attribute can only be appended to the table schema and not replace 
> an existing field. Please move 'userActionTime' to the end of the 
> schema.Exception in thread "main" org.apache.flink.table.api.TableException: 
> The proctime attribute can only be appended to the table schema and not 
> replace an existing field. Please move 'userActionTime' to the end of the 
> schema. at 
> org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

[jira] [Updated] (FLINK-10280) json that contains an object can not be parsed

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-10280:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> json that contains an object can not be parsed
> --
>
> Key: FLINK-10280
> URL: https://issues.apache.org/jira/browse/FLINK-10280
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.6.0
>Reporter: sean.miao
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> h3. *problem*
> data like :
> {"key":"\\{\"key\":1}
> "}
> can be parsed correctly;
>  
> but data like :
> {"key":\{"key":1}}
> cannot be parsed correctly.
>  
> h3. *code position:*
> JsonRowDeserializationSchema#convert
> else {
>  // for types that were specified without JSON schema
>  // e.g. POJOs
>  try
> { return objectMapper.treeToValue(node, info.getTypeClass()); }
> catch (JsonProcessingException e)
> { throw new IllegalStateException("Unsupported type information '" + info + 
> "' for node: " + node); }
> }
> h3.  *error msg:*
> java.lang.IllegalStateException: Unsupported type information 'String' for 
> node: \{"title":"aaa","test":"ttt"}
>  at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convert(JsonRowDeserializationSchema.java:130)
>  at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertRow(JsonRowDeserializationSchema.java:183)
>  at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
>  at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:44)
>  at 
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
>  
> h3. *My solution is as follows:*
> else {
>  // for types that were specified without JSON schema
>  // e.g. POJOs
>  try
> { return objectMapper.treeToValue(node, info.getTypeClass()); }
> catch (JsonProcessingException e)
> { return node.toString }
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14412) Rename ML Pipeline to MLPipeline

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14412:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Rename ML Pipeline to MLPipeline
> 
>
> Key: FLINK-14412
> URL: https://issues.apache.org/jira/browse/FLINK-14412
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Aljoscha Krettek
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> In FLINK-14290 we introduced a {{Pipeline}} interface in {{flink-core}} as 
> the common interface of Flink Jobs/Pipelines. Unfortunately, this name 
> clashes with {{Pipeline}} in the ML package. My suggestion is to rename 
> {{Pipeline}} in the ML package to {{MLPipeline}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-16452) Insert into static partition doesn't support order by or limit

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-16452:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Insert into static partition doesn't support order by or limit
> --
>
> Key: FLINK-16452
> URL: https://issues.apache.org/jira/browse/FLINK-16452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Rui Li
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> The following example would fail:
> {code}
> create table src (x int, y string);
> create table dest (x int) partitioned by (p string, q string);
> insert into dest partition (p='a') select * from src order by x limit 10;
> {code}
> The error is: {{"INSERT INTO  PARTITION statement only support SELECT 
> clause for now"}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14438) Fix RowTypeInfo.equals()

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14438:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Fix RowTypeInfo.equals()
> 
>
> Key: FLINK-14438
> URL: https://issues.apache.org/jira/browse/FLINK-14438
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Timo Walther
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> As discussed in FLINK-12848, the {{equals}} method of {{RowTypeInfo}} causes 
> confusion because it does not consider field names which makes it difficult 
> to use the type in data structures (such as hash maps) or testing.
> So far {{RowTypeInfo}} is marked as  {{@PublicEvolving}}. However, it is not 
> feasible to perform changes to {{RowTypeInfo.equals()}} in the current code 
> base. The number of affected tests (>80) shows that this change is not 
> trivial and would affect the stability of the legacy planner. However, most 
> of the failing tests are in the legacy planner. It seems the Blink code has 
> no problems with this change which is great! We suggest to apply the changes 
> once we drop the legacy planner.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14514) Improve interface of FunctionContext#getJobParameter

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14514:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Improve interface of FunctionContext#getJobParameter
> 
>
> Key: FLINK-14514
> URL: https://issues.apache.org/jira/browse/FLINK-14514
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Jark Wu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Currentlly, {{FunctionContext#getJobParameter}} gets the key-values from 
> env's ExecutionConfig, but during expression reducing, it gets key-values 
> from {{TableConfig#getConfiguration}} . As we are aiming to not expose 
> underlying env which means users can't access ExecutionConfig in the future. 
> So I propose to get from {{TableConfig#getConfiguration}} and make it clear 
> in javadoc. 
> This might be a in-compatible way, but considering it is not heavily used, I 
> think it's fine. We can add a release note for this.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14508) google protobuf is not shaded

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14508:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> google protobuf is not shaded 
> --
>
> Key: FLINK-14508
> URL: https://issues.apache.org/jira/browse/FLINK-14508
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.1, 1.8.2
>Reporter: YING HOU
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: image-2019-10-23-16-55-00-959.png, 
> image-2019-10-25-15-57-08-031.png, image-2019-10-25-15-57-26-129.png, 
> image-2019-10-25-16-05-37-336.png
>
>
> I try to use phoenix in my flink project. When I use 
> 'org.apache.phoenix.queryserver.client.Driver' as my jdbc driver which is 
> inherited from 'org.apache.calcite.avatica.remote.Driver', I got a 
> ClassNotFoundException as follow:
> !image-2019-10-23-16-55-00-959.png!
>  
> I guess the protobuf-java may not be shaded in the module flink-table-planner



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-10408) Cannot use window information in Table functions

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-10408:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Cannot use window information in Table functions
> 
>
> Key: FLINK-10408
> URL: https://issues.apache.org/jira/browse/FLINK-10408
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
> Environment: Flink v1.5.3
>Reporter: Alexis Sarda-Espinosa
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> Given the examples shown in 
> [here|https://flink.apache.org/news/2017/03/29/table-sql-api-update.html], I 
> assume the following should work: 
> {code:java}
> ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(execEnv);
> DataSource> source = execEnv.fromElements(
> new Tuple3<>(Timestamp.valueOf("2018-09-20 22:00:00"), "a", 1.3),
> new Tuple3<>(Timestamp.valueOf("2018-09-20 22:01:00"), "a", 2.1),
> new Tuple3<>(Timestamp.valueOf("2018-09-20 22:02:00"), "a", 3.0),
> new Tuple3<>(Timestamp.valueOf("2018-09-20 22:00:00"), "b", 2.2),
> new Tuple3<>(Timestamp.valueOf("2018-09-20 22:01:00"), "b", 1.8)
> );
> Table table = tableEnv.fromDataSet(source)
> .window(Slide.over("2.minutes").every("1.minute").on("f0").as("w"))
> .groupBy("f1, w")
> .select("(f2 * (f0 - w.start).exp() / 1.hour).sum" +
> " / ((f0 - w.start).exp() / 1.hour).sum");
> tableEnv.toDataSet(table, Row.class).print();
> {code}
> However, I get the following exception:
> {code:java}
> org.apache.flink.table.api.ValidationException: Cannot resolve [w] given 
> input [f0, f1, f2]
> {code}
> Maybe related to Flink-6618?
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14565) Shutdown SystemResourcesCounter on (JM|TM)MetricGroup closed

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14565:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor 
pull-request-available  (was: auto-deprioritized-major pull-request-available 
stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Shutdown SystemResourcesCounter on (JM|TM)MetricGroup closed
> 
>
> Key: FLINK-14565
> URL: https://issues.apache.org/jira/browse/FLINK-14565
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Reporter: Zili Chen
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, we start SystemResourcesCounter when initialize 
> (JM|TM)MetricGroup. This thread doesn't exit on (JM|TM)MetricGroup closed and 
> even there is not exit logic of them.
> It possibly causes thread leak. For example, on our platform which supports 
> previewing sample SQL execution, it starts a MiniCluster in the same process 
> as the platform. When the preview job finished MiniCluster closed and also 
> (JM|TM)MetricGroup. However these SystemResourcesCounter threads remain.
> I propose when creating SystemResourcesCounter, track it in 
> (JM|TM)MetricGroup, and on (JM|TM)MetricGroup closed, shutdown 
> SystemResourcesCounter. This way, we survive from thread leaks.
> CC [~chesnay] [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-16682) Make batch size configurable for orc reader

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-16682:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Make batch size configurable for orc reader
> ---
>
> Key: FLINK-16682
> URL: https://issues.apache.org/jira/browse/FLINK-16682
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Rui Li
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14523) Flink Stops Consuming from Kafka after Leader Election

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14523:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Flink Stops Consuming from Kafka after Leader Election
> --
>
> Key: FLINK-14523
> URL: https://issues.apache.org/jira/browse/FLINK-14523
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
> Environment: In AWS we run the app on EMR with the following versions:
>  * EMR Release - emr-5.15.0
>  * Hadoop distribution - Amazon 2.8.3
>  * Flink - Flink 1.4.2
> We submit the job to the cluster as an EMR step using ***command-runner.jar*. 
> We submit the job with the following arguments:
> {code:java}
> "Args": [
>   "flink", "run", "-m", "yarn-cluster", 
>   "-c", "com.salesforce.sde.streamingsearches.StreamingSearchesJob", 
>   "-yst", "-ys", "4", "-yn", "10", "-yjm", "2800", "-ytm", "2800",
>   "-ynm", "streaming-searches-prod",
>   "-d", "/home/hadoop/streaming-searches-1.0-SNAPSHOT.jar"
> ]{code}
> Additionally we build our application jar with Flink 1.4.2 and Kafka 0.11.
>  
>Reporter: James B. Fitzgerald
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> We have a Flink application running in AWS on EMR that streams input from a 
> single Kafka topic. Whenever there is a Kafka leader election for any 
> partition of the input topic, our Flink application stops consuming from 
> Kafka entirely. To begin consuming from Kafka again the YARN app must be 
> killed and restarted. We run this same application on premises and in AWS on 
> EMR. We have only observed this behavior when it is running on EMR. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14616) Clarify the ordering guarantees in the "The Broadcast State Pattern"

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14616:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor broadcast  
(was: auto-deprioritized-major broadcast stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Clarify the ordering guarantees in the "The Broadcast State Pattern"
> 
>
> Key: FLINK-14616
> URL: https://issues.apache.org/jira/browse/FLINK-14616
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Documentation
>Affects Versions: 1.9.1
>Reporter: Filip Niksic
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> broadcast
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When talking about the order of events in [The Broadcast State 
> Pattern|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/broadcast_state.html#important-considerations],
>  the current documentation states that the downstream tasks must not assume 
> the broadcast events to be ordered. However, this seems to be imprecise. 
> According to the response I got from [~fhueske] to a 
> [question|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Ordered-events-in-broadcast-state-tp30879.html]
>  I sent to the Flink user mailing list:
> {quote}The order of broadcasted inputs is not guaranteed when the operator 
> that broadcasts its output has a parallelism > 1 because the tasks that 
> receive the broadcasted input consume the records in "random" order from 
> their input channels.
> {quote}
> In particular, when the parallelism of the broadcasting operator is 1, the 
> order _is_ guaranteed.
> [~fhueske] continues with his suggestions on how to ensure the correct 
> ordering of the broadcast events:
> {quote}So there are two approaches:
> 1) make the operator that broadcasts its output run as an operator with 
> parallelism 1 (or add a MapOperator with parallelism 1 that just forwards its 
> input). This will cause all broadcasted records to go through the same 
> network channel and their order is guaranteed on each receiver.
> 2) use timestamps of broadcasted records for ordering and watermarks to 
> reason about completeness.
> If the broadcasted data is (comparatively) small in volume (which is usually 
> given because otherwise broadcasting would be expensive), I'd go with the 
> first option.
> The second approach is more difficult to implement.
> {quote}
> It would be great if the ordering guarantees could be clarified to avoid 
> confusion. This could be achieved by simply expanding the paragraph that 
> talks about the order of events in the "important considerations" section. 
> More ambitiously, the suggestions given by [~fhueske] could be turned into 
> examples.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-14615) Add Flink Web UI capabilities for savepoint

2021-11-21 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14615:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor usability  
(was: auto-deprioritized-major stale-minor usability)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Add Flink Web UI capabilities for savepoint
> ---
>
> Key: FLINK-14615
> URL: https://issues.apache.org/jira/browse/FLINK-14615
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter:  Mario Georgiev
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> Having the ability to do the following would greatly simplify Kubernetes/any 
> cluster deployment that utilizes a single cluster for multiple jobs, rather 
> than job per cluster.
> My proposal is the following: 
> Have a way to trigger savepoint from the UI for a job and have a way to get 
> the savepoint's progress somewhere in the UI.
> Have a way to deploy a new job/existing job from the UI using an existing 
> savepoint and choosing between --allowNonRestoredState or not allowing such 
> state at all. 
> Have a way to trigger savepoints at scheduled interval for job or group of 
> jobs ( this could very well be a second iteration since this would require 
> persistence for the job schedules )



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] MartijnVisser commented on pull request #17808: [FLINK-24928][flink-runtime-web] Typing improvements for Flink UI

2021-11-21 Thread GitBox


MartijnVisser commented on pull request #17808:
URL: https://github.com/apache/flink/pull/17808#issuecomment-974894700


   @AHeise Please merge :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2021-11-21 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447121#comment-17447121
 ] 

Shen Zhu commented on FLINK-24939:
--

Hey Yubin([~liyubin117] ),

Thanks for your feedback:)

Per my understanding SHOW CREATE statements are used to show how a given object 
can be created, in order to get Catalog information, we could use SHOW CATALOGS.

Best,
Shen

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Priority: Major
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24974) blink just translate select ... from a,b to nested loop join.

2021-11-21 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-24974:
-
Affects Version/s: (was: 1.13.3)
   (was: shaded-14.0)

> blink just translate select ... from a,b to nested loop join.
> -
>
> Key: FLINK-24974
> URL: https://issues.apache.org/jira/browse/FLINK-24974
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: 1
>Reporter: Zunyao Mao
>Priority: Major
> Fix For: 1.7.3
>
>
> 1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-24974) blink just translate select ... from a,b to nested loop join.

2021-11-21 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-24974.

Resolution: Invalid

> blink just translate select ... from a,b to nested loop join.
> -
>
> Key: FLINK-24974
> URL: https://issues.apache.org/jira/browse/FLINK-24974
> Project: Flink
>  Issue Type: Bug
> Environment: 
>Reporter: Zunyao Mao
>Priority: Major
>
> 1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24974) blink just translate select ... from a,b to nested loop join.

2021-11-21 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-24974:
-
Environment: 



  was:1


> blink just translate select ... from a,b to nested loop join.
> -
>
> Key: FLINK-24974
> URL: https://issues.apache.org/jira/browse/FLINK-24974
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: 
>Reporter: Zunyao Mao
>Priority: Major
> Fix For: 1.7.3
>
>
> 1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24974) blink just translate select ... from a,b to nested loop join.

2021-11-21 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-24974:
-
Fix Version/s: (was: 1.7.3)

> blink just translate select ... from a,b to nested loop join.
> -
>
> Key: FLINK-24974
> URL: https://issues.apache.org/jira/browse/FLINK-24974
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: 
>Reporter: Zunyao Mao
>Priority: Major
>
> 1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24974) blink just translate select ... from a,b to nested loop join.

2021-11-21 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-24974:
-
Component/s: (was: Tests)

> blink just translate select ... from a,b to nested loop join.
> -
>
> Key: FLINK-24974
> URL: https://issues.apache.org/jira/browse/FLINK-24974
> Project: Flink
>  Issue Type: Bug
> Environment: 
>Reporter: Zunyao Mao
>Priority: Major
>
> 1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (FLINK-24974) blink just translate select ... from a,b to nested loop join.

2021-11-21 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-24974:
--

> blink just translate select ... from a,b to nested loop join.
> -
>
> Key: FLINK-24974
> URL: https://issues.apache.org/jira/browse/FLINK-24974
> Project: Flink
>  Issue Type: Bug
> Environment: 
>Reporter: Zunyao Mao
>Priority: Major
>
> 1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 96a6ba2a83ca22091099ef36201afcbd8e719243 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26794)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-21 Thread GitBox


zhipeng93 commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r753647934



##
File path: 
flink-ml-api/src/main/java/org/apache/flink/ml/linalg/DenseVector.java
##
@@ -42,14 +45,26 @@ public double get(int i) {
 return values[i];
 }
 
+@Override
+public void set(int i, double val) {
+values[i] = val;
+}
+
 @Override
 public double[] toArray() {
 return values;
 }
 
 @Override
 public String toString() {
-return Arrays.toString(values);
+StringBuilder sbd = new StringBuilder();

Review comment:
   why is this method needed? Given that we already have 
`DenseVectorSerializer`?

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/MapPartitionFunctionWrapper.java
##
@@ -0,0 +1,68 @@
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+/**
+ * MapPartitionFunction wrapper.
+ *
+ * @param  Input element type.
+ * @param  Output element type.
+ */
+public class MapPartitionFunctionWrapper extends 
AbstractStreamOperator

Review comment:
   Can we remove this class and use the existing one 
`org.apache.flink.ml.common.datastream.MapPartitionFunctionWrapper`?

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnParams.java
##
@@ -0,0 +1,32 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.ml.common.param.HasFeatureColsDefaultAsNull;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasPredictionCol;
+import org.apache.flink.ml.common.param.HasVectorColDefaultAsNull;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** knn fit parameters. */
+public interface KnnParams
+extends WithParams,
+HasVectorColDefaultAsNull,
+HasLabelCol,
+HasFeatureColsDefaultAsNull,
+HasPredictionCol {
+/**
+ * @cn-name topK
+ * @cn topK
+ */
+Param K = new IntParam("k", "k", 10, ParamValidators.gt(0));

Review comment:
   Can we make `K` a shared param?

##
File path: 
flink-ml-api/src/main/java/org/apache/flink/ml/linalg/DenseVector.java
##
@@ -60,6 +75,74 @@ public boolean equals(Object obj) {
 return Arrays.equals(values, ((DenseVector) obj).values);
 }
 
+/**
+ * Parse the dense vector from a formatted string.
+ *
+ * The format of a dense vector is space separated values such as "1 2 
3 4".
+ *
+ * @param str A string of space separated values.
+ * @return The parsed vector.
+ */
+public static DenseVector fromString(String str) {

Review comment:
   Could this method be a utility function? It may not be part of the math 
library.

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/Knn.java
##
@@ -0,0 +1,255 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.MapPartitionFunctionWrapper;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import 

[GitHub] [flink] godfreyhe commented on a change in pull request #17666: [FLINK-21327][table-planner-blink] Support window TVF in batch mode

2021-11-21 Thread GitBox


godfreyhe commented on a change in pull request #17666:
URL: https://github.com/apache/flink/pull/17666#discussion_r753816404



##
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import java.sql.Timestamp
+
+import org.junit.{Before, Test}
+
+class WindowTableFunctionTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def before(): Unit = {
+util.addTableSource[(Timestamp, Long, Int, String)]("MyTable", 'ts, 'a, 
'b, 'c)
+util.addTableSource[(Int, Long, String, Int, Timestamp)]("MyTable1", 'a, 
'b, 'c, 'd, 'ts)
+util.tableEnv.executeSql(
+  s"""
+ |create table MyTable2 (
+ |  a int,
+ |  b bigint,
+ |  c as proctime()
+ |) with (
+ |  'connector' = 'COLLECTION'
+ |)
+ |""".stripMargin)
+  }
+
+  @Test
+  def testInvalidTimeColType(): Unit = {
+val sql =
+  """
+|SELECT *
+|FROM TABLE(TUMBLE(TABLE MyTable1, DESCRIPTOR(b), INTERVAL '15' 
MINUTE))
+|""".stripMargin
+expectedException.expect(classOf[ValidationException])
+expectedException.expectMessage(
+  "The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), 
datetime interval"
++ "[, datetime interval]) requires the timecol to be TIMESTAMP or 
TIMESTAMP_LTZ, "
++ "but is BIGINT.")
+util.verifyExplain(sql)
+  }
+
+  @Test
+  def testTumbleTVF(): Unit = {
+val sql =
+  """
+|SELECT *
+|FROM TABLE(TUMBLE(TABLE MyTable1, DESCRIPTOR(ts), INTERVAL '15' 
MINUTE))
+|""".stripMargin
+util.verifyExplain(sql)

Review comment:
   please use `verifyExecPlan`

##
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala
##
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, 
LONG_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.TestData._
+import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
+
+import org.junit.{Before, Test}
+
+class WindowTableFunctionITCase extends BatchTestBase {

Review comment:
   we should add a WindowTableFunctionITCase for streaming

##
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
##
@@ -99,8 +100,10 @@ class WindowTableFunctionTest extends TableTestBase {
 |FROM TABLE(
 | TUMBLE(TABLE v1, DESCRIPTOR(cur_time), INTERVAL '15' MINUTE))
 |""".stripMargin
-thrown.expectMessage("requires the timecol is a time attribute type, but 
is 

[GitHub] [flink] flinkbot edited a comment on pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17554:
URL: https://github.com/apache/flink/pull/17554#issuecomment-950550174






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] MonsterChenzhuo commented on pull request #17508: [FLINK-24351][docs] Translate "JSON Function" pages into Chinese

2021-11-21 Thread GitBox


MonsterChenzhuo commented on pull request #17508:
URL: https://github.com/apache/flink/pull/17508#issuecomment-974634974


   @RocMarshal Please go to #17789 to review 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] godfreyhe commented on a change in pull request #17670: [FLINK-24760][docs] Update user document for batch window tvf

2021-11-21 Thread GitBox


godfreyhe commented on a change in pull request #17670:
URL: https://github.com/apache/flink/pull/17670#discussion_r753819169



##
File path: docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
##
@@ -48,15 +48,21 @@ See more how to apply further computations based on 
windowing TVF:
 
 ## Window Functions
 
-Apache Flink provides 3 built-in windowing TVFs: `TUMBLE`, `HOP` and 
`CUMULATE`. The return value of windowing TVF is a new relation that includes 
all columns of original relation as well as additional 3 columns named 
"window_start", "window_end", "window_time" to indicate the assigned window. 
The "window_time" field is a [time attributes]({{< ref 
"docs/dev/table/concepts/time_attributes" >}}) of the window after windowing 
TVF which can be used in subsequent time-based operations, e.g. another 
windowing TVF, or }}#interval-joins">interval joins, }}">over aggregations. The value of 
`window_time` always equal to `window_end - 1ms`.
+Apache Flink provides 3 built-in windowing TVFs: `TUMBLE`, `HOP` and 
`CUMULATE`. The return value of windowing TVF is a new relation that includes 
all columns of original relation as well as additional 3 columns named 
"window_start", "window_end", "window_time" to indicate the assigned window. 
+For SQL queries on streaming tables, the "window_time" field is a [time 
attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) of the 
window. 

Review comment:
   ditto

##
File path: docs/content.zh/docs/dev/table/sql/queries/window-agg.md
##
@@ -40,7 +40,9 @@ Unlike other aggregations on continuous tables, window 
aggregation do not emit i
 
 ### Windowing TVFs
 
-Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations, 
which can be defined on either [event or processing time attributes]({{< ref 
"docs/dev/table/concepts/time_attributes" >}}). See [Windowing TVF]({{< ref 
"docs/dev/table/sql/queries/window-tvf" >}}) for more windowing functions 
information.
+Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations.
+For SQL queries on streaming tables, the time attribute field of a window 
table-valued function must be on either [event or processing time 
attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}). See 
[Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) for more 
windowing functions information.
+For SQL on batch tables, the time attribute field of a window table-valued 
function must be an attribute of type `TIMESTAMP` or `TIMESTAMP_LTZ`. 

Review comment:
   For batch queries

##
File path: docs/content.zh/docs/dev/table/sql/queries/window-agg.md
##
@@ -40,7 +40,9 @@ Unlike other aggregations on continuous tables, window 
aggregation do not emit i
 
 ### Windowing TVFs
 
-Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations, 
which can be defined on either [event or processing time attributes]({{< ref 
"docs/dev/table/concepts/time_attributes" >}}). See [Windowing TVF]({{< ref 
"docs/dev/table/sql/queries/window-tvf" >}}) for more windowing functions 
information.
+Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations.
+For SQL queries on streaming tables, the time attribute field of a window 
table-valued function must be on either [event or processing time 
attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}). See 
[Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) for more 
windowing functions information.

Review comment:
   For streaming queries




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17133: [FLINK-24138] Architectural tests

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #17133:
URL: https://github.com/apache/flink/pull/17133#issuecomment-912323602






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-11-21 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   >