[jira] [Updated] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object

2022-04-11 Thread Akshay Hazari (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Hazari updated FLINK-27193:
--
Description: 
We have a unit test to check if Kyro serialisation and deserialisation results 
in the same value but it fails

The KyroSerializer and Deserializer is used like this
{code:java}
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer
class KryoSerializerExtension {
fun  serde(t: T): T
{ val bytes = serialize(t) return deserialize(bytes, t::class) }
fun serialize(any: Any): ByteArray {
val config = ExecutionConfig()
config.registerKryoType(any.javaClass)
val serializer = KryoSerializer(any.javaClass, config)
val output = DataOutputSerializer(1)
serializer.serialize(any, output)
return output.sharedBuffer
}
fun  deserialize(bytes: ByteArray, kClass: KClass): T {
val config = ExecutionConfig()
config.registerKryoType(kClass.java)
val serializer = KryoSerializer(kClass.java, config)
val input = DataInputDeserializer(bytes)
return serializer.deserialize(input)
}
}
{code}
 

The Unit test simply looks like this
{code:java}
@Test
fun fieldRecord() {
val record = getFieldRecord()
val result = kryo.serde(record)
assertThat(result).isEqualTo(record)
}{code}
This is the actual vs expected assertion error.

The record is huge all the components hash result in a different value. I am 
not sure hot the record is modified.
{code:java}
org.opentest4j.AssertionFailedError: 
expected: "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.00e+00
   Max Value                    : 3.00e+00
### END SKETCH SUMMARY
, 
numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
but was : "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.00e+00
   Max Value                    : 3.00e+00
### END SKETCH SUMMARY
, 

[jira] [Updated] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object

2022-04-11 Thread Akshay Hazari (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Hazari updated FLINK-27193:
--
Priority: Minor  (was: Not a Priority)

> Kyro Serialisation and Deserialisation returns a different object 
> --
>
> Key: FLINK-27193
> URL: https://issues.apache.org/jira/browse/FLINK-27193
> Project: Flink
>  Issue Type: Bug
>Reporter: Akshay Hazari
>Priority: Minor
>
> We have a unit test to check if Kyro serialisation and deserialisation 
> results in the same value but it fails
> The KyroSerializer and Deserializer is used like this
> {code:java}
> import kotlin.reflect.KClass
> import org.apache.flink.api.common.ExecutionConfig
> import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> import org.apache.flink.core.memory.DataInputDeserializer
> import org.apache.flink.core.memory.DataOutputSerializer
> class KryoSerializerExtension {
> fun  serde(t: T): T
> { val bytes = serialize(t) return deserialize(bytes, t::class) }
> fun serialize(any: Any): ByteArray {
> val config = ExecutionConfig()
> config.registerKryoType(any.javaClass)
> val serializer = KryoSerializer(any.javaClass, config)
> val output = DataOutputSerializer(1)
> serializer.serialize(any, output)
> return output.sharedBuffer
> }
> fun  deserialize(bytes: ByteArray, kClass: KClass): T {
> val config = ExecutionConfig()
> config.registerKryoType(kClass.java)
> val serializer = KryoSerializer(kClass.java, config)
> val input = DataInputDeserializer(bytes)
> return serializer.deserialize(input)
> }
> }
> {code}
>  
> The Unit test simply looks like this
> {code:java}
> @Test
> fun fieldRecord() {
> val record = getFieldRecord()
> val result = kryo.serde(record)
> assertThat(result).isEqualTo(record)
> }{code}
> This is the actual vs expected assertion error.
> The record is huge all the components hash result in a different value. I am 
> not sure what kyro does which is modifying the record.
> {code:java}
> org.opentest4j.AssertionFailedError: 
> expected: "FieldRecord(name=foo, type=string, 
> fieldCounter=FieldCounter(populated=1, missing=3), 
> countDistinctCalculator=### CPD SKETCH - PREAMBLE:
>   Flavor         : SPARSE
>   LgK            : 10
>   Merge Flag     : false
>   Error Const    : 0.5887050112577373
>   RSE            : 0.01839703160180429
>   Seed Hash      : 93cc | 37836
>   Num Coupons    : 2
>   Num Pairs (SV) : 2
>   First Inter Col: 0
>   Valid Window   : false
>   Valid PairTable: true
>   Window Offset  : 0
>   KxP            : 1023.375
>   HIP Accum      : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
> frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY: 
>    Empty                        : false
>    Direct, Capacity bytes       : false, 
>    Estimation Mode              : false
>    K                            : 128
>    N                            : 2
>    Levels (Needed, Total, Valid): 0, 0, 0
>    Level Bit Pattern            : 0
>    BaseBufferCount              : 2
>    Combined Buffer Capacity     : 4
>    Retained Items               : 2
>    Compact Storage Bytes        : 48
>    Updatable Storage Bytes      : 64
>    Normalized Rank Error        : 1.406%
>    Normalized Rank Error (PMF)  : 1.711%
>    Min Value                    : 1.00e+00
>    Max Value                    : 3.00e+00
> ### END SKETCH SUMMARY
> , 
> numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
> mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
> but was : "FieldRecord(name=foo, type=string, 
> fieldCounter=FieldCounter(populated=1, missing=3), 
> countDistinctCalculator=### CPD SKETCH - PREAMBLE:
>   Flavor         : SPARSE
>   LgK            : 10
>   Merge Flag     : false
>   Error Const    : 0.5887050112577373
>   RSE            : 0.01839703160180429
>   Seed Hash      : 93cc | 37836
>   Num Coupons    : 2
>   Num Pairs (SV) : 2
>   First Inter Col: 0
>   Valid Window   : false
>   Valid PairTable: true
>   Window Offset  : 0
>   KxP            : 1023.375
>   HIP Accum      : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
> frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY: 
>    Empty                        : false
>    Direct, 

[jira] [Updated] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object

2022-04-11 Thread Akshay Hazari (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Hazari updated FLINK-27193:
--
Description: 
We have a unit test to check if Kyro serialisation and deserialisation results 
in the same value but it fails

The KyroSerializer and Deserializer is used like this
{code:java}
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer
class KryoSerializerExtension {
fun  serde(t: T): T
{ val bytes = serialize(t) return deserialize(bytes, t::class) }
fun serialize(any: Any): ByteArray {
val config = ExecutionConfig()
config.registerKryoType(any.javaClass)
val serializer = KryoSerializer(any.javaClass, config)
val output = DataOutputSerializer(1)
serializer.serialize(any, output)
return output.sharedBuffer
}
fun  deserialize(bytes: ByteArray, kClass: KClass): T {
val config = ExecutionConfig()
config.registerKryoType(kClass.java)
val serializer = KryoSerializer(kClass.java, config)
val input = DataInputDeserializer(bytes)
return serializer.deserialize(input)
}
}
{code}

 

The Unit test simply looks like this
{code:java}
@Test
fun fieldRecord() {
val record = getFieldRecord()
val result = kryo.serde(record)
assertThat(result).isEqualTo(record)
}{code}

This is the actual vs expected assertion error.

The record is huge all the components hash result in a different value. I am 
not sure what kyro does which is modifying the record.
{code:java}
org.opentest4j.AssertionFailedError: 
expected: "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.00e+00
   Max Value                    : 3.00e+00
### END SKETCH SUMMARY
, 
numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
but was : "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.00e+00
   Max Value                    : 3.00e+00
### END SKETCH SUMMARY
, 

[jira] [Created] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object

2022-04-11 Thread Akshay Hazari (Jira)
Akshay Hazari created FLINK-27193:
-

 Summary: Kyro Serialisation and Deserialisation returns a 
different object 
 Key: FLINK-27193
 URL: https://issues.apache.org/jira/browse/FLINK-27193
 Project: Flink
  Issue Type: Bug
Reporter: Akshay Hazari


We have a unit test to check if Kyro serialisation and deserialisation results 
in the same value but it fails

The KyroSerializer and Deserializer is used like this
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer

class KryoSerializerExtension \{
  fun  serde(t: T): T {
val bytes = serialize(t)
return deserialize(bytes, t::class)
  }

  fun serialize(any: Any): ByteArray \{
val config = ExecutionConfig()
config.registerKryoType(any.javaClass)
val serializer = KryoSerializer(any.javaClass, config)
val output = DataOutputSerializer(1)
serializer.serialize(any, output)
return output.sharedBuffer
  }

  fun  deserialize(bytes: ByteArray, kClass: KClass): T \{
val config = ExecutionConfig()
config.registerKryoType(kClass.java)
val serializer = KryoSerializer(kClass.java, config)
val input = DataInputDeserializer(bytes)
return serializer.deserialize(input)
  }
}
 

It simply looks like this
@Test
fun fieldRecord() \{
  val record = getFieldRecord()
  val result = kryo.serde(record)
  assertThat(result).isEqualTo(record)
}
This is the actual vs expected assertion error.

The record is huge all the components hash result in a different value. I am 
not sure what kyro does which is modifying the record.
org.opentest4j.AssertionFailedError: 

expected: "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor : SPARSE
  LgK: 10
  Merge Flag : false
  Error Const: 0.5887050112577373
  RSE: 0.01839703160180429
  Seed Hash  : 93cc | 37836
  Num Coupons: 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP: 1023.375
  HIP Accum  : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: \{test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty: false
   Direct, Capacity bytes   : false, 
   Estimation Mode  : false
   K: 128
   N: 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern: 0
   BaseBufferCount  : 2
   Combined Buffer Capacity : 4
   Retained Items   : 2
   Compact Storage Bytes: 48
   Updatable Storage Bytes  : 64
   Normalized Rank Error: 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value: 1.00e+00
   Max Value: 3.00e+00
### END SKETCH SUMMARY
, 
numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"

but was : "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor : SPARSE
  LgK: 10
  Merge Flag : false
  Error Const: 0.5887050112577373
  RSE: 0.01839703160180429
  Seed Hash  : 93cc | 37836
  Num Coupons: 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP: 1023.375
  HIP Accum  : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: \{test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty: false
   Direct, Capacity bytes   : false, 
   Estimation Mode  : false
   K: 128
   N: 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern: 0
   BaseBufferCount  : 2
   Combined Buffer Capacity : 4
   Retained Items   : 2
   Compact Storage Bytes: 48
   Updatable Storage Bytes  : 64
   Normalized Rank Error: 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value: 1.00e+00
   Max 

[jira] [Commented] (FLINK-27190) Revisit error handling in main reconcile() loop

2022-04-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520915#comment-17520915
 ] 

Gyula Fora commented on FLINK-27190:


What improvements do you suggest?

> Revisit error handling in main reconcile() loop
> ---
>
> Key: FLINK-27190
> URL: https://issues.apache.org/jira/browse/FLINK-27190
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Matyas Orhidi
>Priority: Major
> Fix For: kubernetes-operator-1.0.0
>
>
> The are some improvements introduced around error handling:
>  * [https://github.com/java-operator-sdk/java-operator-sdk/pull/1033]
>  in the upcoming java-operator-sdk release 
> [v3.0.0.RC1.|https://github.com/java-operator-sdk/java-operator-sdk/releases/tag]
>  We should revisit and simplify further the error logic in 
> {{FlinkDeploymentController.reconcile()}}
> {{Currently}}
>  * checked exceptions are wrapped in runtime exceptions
>  * validation errors are terminal errors but handled with differently
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] liuzhuang2017 commented on a diff in pull request #19413: [FLINK-16078] [docs-zh] Translate "Tuning Checkpoints and Large State…

2022-04-11 Thread GitBox


liuzhuang2017 commented on code in PR #19413:
URL: https://github.com/apache/flink/pull/19413#discussion_r847981768


##
docs/content.zh/docs/ops/state/large_state_tuning.md:
##
@@ -166,149 +125,101 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 }
 ```
 
-## Capacity Planning
-
-This section discusses how to decide how many resources should be used for a 
Flink job to run reliably.
-The basic rules of thumb for capacity planning are:
-
-  - Normal operation should have enough capacity to not operate under constant 
*back pressure*.
-See [back pressure monitoring]({{< ref "docs/ops/monitoring/back_pressure" 
>}}) for details on how to check whether the application runs under back 
pressure.
-
-  - Provision some extra resources on top of the resources needed to run the 
program back-pressure-free during failure-free time.
-These resources are needed to "catch up" with the input data that 
accumulated during the time the application
-was recovering.
-How much that should be depends on how long recovery operations usually 
take (which depends on the size of the state
-that needs to be loaded into the new TaskManagers on a failover) and how 
fast the scenario requires failures to recover.
-
-*Important*: The base line should to be established with checkpointing 
activated, because checkpointing ties up
-some amount of resources (such as network bandwidth).
-
-  - Temporary back pressure is usually okay, and an essential part of 
execution flow control during load spikes,
-during catch-up phases, or when external systems (that are written to in a 
sink) exhibit temporary slowdown.
+## 容量规划
+本节讨论如何确定 Flink 作业应该使用多少资源才能可靠地运行。
+容量规划的基本经验法则是:
+  - 正常运行应该有足够的能力在恒定的*反压*下运行。

Review Comment:
   这里我提了一个 hotfix pr,在 https://github.com/apache/flink/pull/19429,帮忙看看,感谢。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27165) Benchmark test SerializationFrameworkMiniBenchmarks#serializerHeavyString became unstable

2022-04-11 Thread Anton Kalashnikov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520914#comment-17520914
 ] 

Anton Kalashnikov commented on FLINK-27165:
---

I can try to take a look. But I don't see that it relates to Java 11 since 
there were no changes at all for java 8(it was added only a new Jenkins build 
for java11). And also according to this graph java 11 was added a little later 
than instabilities appeared for the first time(you can compare java8 and java11 
[here|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,6=serializerHeavyString=on=on=off=2=1000])

> Benchmark test SerializationFrameworkMiniBenchmarks#serializerHeavyString 
> became unstable
> -
>
> Key: FLINK-27165
> URL: https://issues.apache.org/jira/browse/FLINK-27165
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> The benchmark test 
> {{SerializationFrameworkMiniBenchmarks#serializerHeavyString}} became 
> unstable mid of January 2022 which cannot be explained (see 
> [graph|http://codespeed.dak8s.net:8000/timeline/#/?exe=1=serializerHeavyString=on=on=off=2=1000]).
>  
> There is some suspicion that it was caused by FLINK-25246 because Java 11 was 
> added during that time.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

2022-04-11 Thread GitBox


masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r847968836


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java:
##
@@ -391,7 +394,7 @@ static void serializeKeyedStateHandle(KeyedStateHandle 
stateHandle, DataOutputSt
 dos.writeLong(handle.getStateSize());
 dos.writeLong(handle.getCheckpointedSize());
 writeStateHandleId(handle, dos);
-
+dos.writeUTF(handle.getStorageIdentifier());

Review Comment:
   I am thinking a more common case about switching the change log storage.
   There are some problems in current implementation which use same change log 
storage to read and write.
   I think we should use original change log storage to read and use new one to 
write.
   It also make sense for switching from enabled to disabled.
   But we cannot get original change log storage currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27192) Parquet File Read From Local

2022-04-11 Thread Karthik (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Karthik updated FLINK-27192:

Issue Type: Technical Debt  (was: Bug)

> Parquet File Read From Local
> 
>
> Key: FLINK-27192
> URL: https://issues.apache.org/jira/browse/FLINK-27192
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API, Table SQL / Client
>Affects Versions: 1.13.6
> Environment: Flink-1.13.6
> Scala-2.11
>  
>Reporter: Karthik
>Priority: Major
> Fix For: 1.13.6
>
>
> Hi, I want to read a parquet file from my local
> But it is asking for Hadoop Config
> Note: I'm using this jar flink-sql-parquet_2.11-1.13.6.jar



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #87: [FLINK-27123] Introduce filter predicate for 'LIKE'

2022-04-11 Thread GitBox


JingsongLi commented on code in PR #87:
URL: https://github.com/apache/flink-table-store/pull/87#discussion_r847944688


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to evaluate {@code filter like 'abc%' or filter like 
'abc_'}. */
+public class StartsWith implements Predicate {

Review Comment:
   `like 'abc_'` looks not like a `StartsWith`.
   We can have a separate class to support it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #87: [FLINK-27123] Introduce filter predicate for 'LIKE'

2022-04-11 Thread GitBox


JingsongLi commented on code in PR #87:
URL: https://github.com/apache/flink-table-store/pull/87#discussion_r847942602


##
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java:
##
@@ -325,11 +345,29 @@ public void testUnsupportedExpression() {
 field(0, DataTypes.INT()),
 literal(3)),
 call(
-BuiltInFunctionDefinitions.LIKE,
+BuiltInFunctionDefinitions.SIMILAR,
 field(1, DataTypes.INT()),
 literal(5)));
 assertThatThrownBy(() -> expression.accept(CONVERTER))
 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+CallExpression endPattern =

Review Comment:
   There can be a `testUnsupportedLike` to test like only.
   The test cases can be:
   - equalsPattern/endPattern/middlePattern
   - equalsPattern/endPattern/middlePattern with escape
   - equalsPattern/endPattern/middlePattern with '_'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26761) Fix the cast exception thrown by PreValidateReWriter when insert into/overwrite a partitioned table.

2022-04-11 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-26761:
---
Fix Version/s: 1.16.0

> Fix the cast exception thrown by PreValidateReWriter when insert 
> into/overwrite a partitioned table.
> 
>
> Key: FLINK-26761
> URL: https://issues.apache.org/jira/browse/FLINK-26761
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: zoucao
>Priority: Major
> Fix For: 1.16.0
>
>
> In `PreValidateReWriter#appendPartitionAndNullsProjects`, we should use
> {code:java}
> val names = sqlInsert.getTargetTableID.asInstanceOf[SqlIdentifier].names
> {code}
> to get the table name, instead of
> {code:java}
> val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
> {code}
> when we execute the following sql:
> {code:java}
> insert into/overwrite table_name /*+ options(xxx) */ partition(xxx) select  
> {code}
> invoke `sqlInsert.getTargetTable` will get a SqlTableRef, which can not be 
> cast to SqlIdentifier.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26761) Fix the cast exception thrown by PreValidateReWriter when insert into/overwrite a partitioned table.

2022-04-11 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520902#comment-17520902
 ] 

godfrey he commented on FLINK-26761:


[~zoucao] you are right, it's a bug. would you like to fix it ?

> Fix the cast exception thrown by PreValidateReWriter when insert 
> into/overwrite a partitioned table.
> 
>
> Key: FLINK-26761
> URL: https://issues.apache.org/jira/browse/FLINK-26761
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: zoucao
>Priority: Major
>
> In `PreValidateReWriter#appendPartitionAndNullsProjects`, we should use
> {code:java}
> val names = sqlInsert.getTargetTableID.asInstanceOf[SqlIdentifier].names
> {code}
> to get the table name, instead of
> {code:java}
> val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
> {code}
> when we execute the following sql:
> {code:java}
> insert into/overwrite table_name /*+ options(xxx) */ partition(xxx) select  
> {code}
> invoke `sqlInsert.getTargetTable` will get a SqlTableRef, which can not be 
> cast to SqlIdentifier.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-27192) Parquet File Read From Local

2022-04-11 Thread Karthik (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Karthik updated FLINK-27192:

  Component/s: Table SQL / API
   Table SQL / Client
Fix Version/s: 1.13.6
Affects Version/s: 1.13.6
  Description: 
Hi, I want to read a parquet file from my local

But it is asking for Hadoop Config

Note: I'm using this jar flink-sql-parquet_2.11-1.13.6.jar
  Environment: 
Flink-1.13.6

Scala-2.11

 
  Summary: Parquet File Read From Local  (was: Hi,)

> Parquet File Read From Local
> 
>
> Key: FLINK-27192
> URL: https://issues.apache.org/jira/browse/FLINK-27192
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Client
>Affects Versions: 1.13.6
> Environment: Flink-1.13.6
> Scala-2.11
>  
>Reporter: Karthik
>Priority: Major
> Fix For: 1.13.6
>
>
> Hi, I want to read a parquet file from my local
> But it is asking for Hadoop Config
> Note: I'm using this jar flink-sql-parquet_2.11-1.13.6.jar



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-27189) Support to always add quotes for string data in CSV format

2022-04-11 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-27189:

Summary: Support to always add quotes for string data in CSV format  (was: 
Ability for table api to always add quotes to generated csv)

> Support to always add quotes for string data in CSV format
> --
>
> Key: FLINK-27189
> URL: https://issues.apache.org/jira/browse/FLINK-27189
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: shameet
>Priority: Major
>
> I am using the table api in pyflink to generate a csv . What i noticed is 
> that its conditionally adding quotes around the data. What I want is quotes 
> around all the data
> csv is being created in s3
>  
>  
> e.g in output below the data in last column was not quoted
> "[transaction_idgeorge.bl...@reqres.in|mailto:transaction_idgeorge.bl...@reqres.in]",card_hash,transaction_id
> "[no3500957594177george.bl...@reqres.in|mailto:no3500957594177george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177
> "[no3500957594178george.bl...@reqres.in|mailto:no3500957594178george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178
>  
>  
> I had posted this question in user community and Dian FU suggested i could 
> create a Jira as this is not supported right now
> [https://lists.apache.org/thread/y2g7kjf6ylmqtm2w9b28kfcdborvcgtn]
>  
>  
> sample code to create a csv in s3
>  
> def create_source_table(table_name, input_path):
> return""" CREATE TABLE \{0} (
> transaction_id VARCHAR(100),
> card_hash VARCHAR(100)
> ) with (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = '\{1}'
> ) """.format(
> table_name, input_path)
> def create_sink_table(table_name, bucket_name):
> return""" CREATE TABLE \{0} (
> transaction_id VARCHAR(100),
> card_hash VARCHAR(100),
> brand_id VARCHAR(100)
> )
> with (
> 'connector'='filesystem',
> 'path'='\{1}',
> 'format'='csv'
> ) """.format(
> table_name, bucket_name)
>  
> 2. Creates a source table from a Kinesis Data Stream
> table_env.execute_sql(
> create_source_table(
> input_table_name, input_file
> )
> ){color:#00}
> {color}
> table_env.execute_sql(
> create_sink_table(
> out_table_name, output_bucket_name
> )
> ){color:#00}
> {color}
> table_env.register_function("addme1", addme1)
> {color:#00} {color}{color:#00}
> {color}{color:#00}
> {color}
> source_table = table_env.from_path(input_table_name)
> source_table.select(addme1(source_table.transaction_id),source_table.card_hash,
>  
> source_table.transaction_id.alias('brand_id')).execute_insert(out_table_name).wait()
> {color:#202020} {color}
>  
>  
> apache-flink version - 1.13
> python 3.8
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-27189) Ability for table api to always add quotes to generated csv

2022-04-11 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-27189:

Component/s: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
 (was: API / Python)

> Ability for table api to always add quotes to generated csv
> ---
>
> Key: FLINK-27189
> URL: https://issues.apache.org/jira/browse/FLINK-27189
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: shameet
>Priority: Major
>
> I am using the table api in pyflink to generate a csv . What i noticed is 
> that its conditionally adding quotes around the data. What I want is quotes 
> around all the data
> csv is being created in s3
>  
>  
> e.g in output below the data in last column was not quoted
> "[transaction_idgeorge.bl...@reqres.in|mailto:transaction_idgeorge.bl...@reqres.in]",card_hash,transaction_id
> "[no3500957594177george.bl...@reqres.in|mailto:no3500957594177george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177
> "[no3500957594178george.bl...@reqres.in|mailto:no3500957594178george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178
>  
>  
> I had posted this question in user community and Dian FU suggested i could 
> create a Jira as this is not supported right now
> [https://lists.apache.org/thread/y2g7kjf6ylmqtm2w9b28kfcdborvcgtn]
>  
>  
> sample code to create a csv in s3
>  
> def create_source_table(table_name, input_path):
> return""" CREATE TABLE \{0} (
> transaction_id VARCHAR(100),
> card_hash VARCHAR(100)
> ) with (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = '\{1}'
> ) """.format(
> table_name, input_path)
> def create_sink_table(table_name, bucket_name):
> return""" CREATE TABLE \{0} (
> transaction_id VARCHAR(100),
> card_hash VARCHAR(100),
> brand_id VARCHAR(100)
> )
> with (
> 'connector'='filesystem',
> 'path'='\{1}',
> 'format'='csv'
> ) """.format(
> table_name, bucket_name)
>  
> 2. Creates a source table from a Kinesis Data Stream
> table_env.execute_sql(
> create_source_table(
> input_table_name, input_file
> )
> ){color:#00}
> {color}
> table_env.execute_sql(
> create_sink_table(
> out_table_name, output_bucket_name
> )
> ){color:#00}
> {color}
> table_env.register_function("addme1", addme1)
> {color:#00} {color}{color:#00}
> {color}{color:#00}
> {color}
> source_table = table_env.from_path(input_table_name)
> source_table.select(addme1(source_table.transaction_id),source_table.card_hash,
>  
> source_table.transaction_id.alias('brand_id')).execute_insert(out_table_name).wait()
> {color:#202020} {color}
>  
>  
> apache-flink version - 1.13
> python 3.8
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27192) Hi,

2022-04-11 Thread Karthik (Jira)
Karthik created FLINK-27192:
---

 Summary: Hi,
 Key: FLINK-27192
 URL: https://issues.apache.org/jira/browse/FLINK-27192
 Project: Flink
  Issue Type: Bug
Reporter: Karthik






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-27138) flink1.14.0-sql-job submit failed:Flink doesn't support individual window table-valued function TUMBLE

2022-04-11 Thread yanbiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520899#comment-17520899
 ] 

yanbiao commented on FLINK-27138:
-

[~martijnvisser] got it,thanks for your help:)

> flink1.14.0-sql-job submit failed:Flink doesn't support individual window 
> table-valued function TUMBLE
> --
>
> Key: FLINK-27138
> URL: https://issues.apache.org/jira/browse/FLINK-27138
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Table SQL / Client
>Affects Versions: 1.14.0
> Environment: CentOS-7
> flink 1.14.0 Release
>Reporter: yanbiao
>Priority: Critical
> Attachments: 3个or正常.png, webUI报错信息.png, 部署目录.png
>
>
> I am working with a standalone deployment of flink-1.14.0-bin-scala_2.11
> the package is downloaded  from official website:    
> [https://flink.apache.org/downloads.html]
> then i submit the job to the standalone with rest interface of  [ 
> /jars//run]
> in most condition,it works well (where clause is simple or without an *or* 
> operator)
> but when i add some conditions with or operator in where clause, the submit 
> go with a exception , and even more surprising  is that
> [where A or B] , [where  A or B or C] is OK, 
> but when or is more than 3 ,like [where A or B or C or D] *is not OK;*
> i have tried more situations,they worked diffrent,
> and it works well to add a true condition before an or condition,for example:
> [where 1=1 and A or B or C]
> *I have asked for advice from offical community DingDing Group and wechat 
> Group,*
> *the admin told me that it should be a BUG and suggested starting a issue*
> the following message is the stacktrace:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Currently Flink doesn't support individual window 
> table-valued function TUMBLE(time_col=[ts], size=[10 min]).
>  Please use window table-valued function with the following computations:
> 1. aggregate using window_start and window_end as group keys.
> 2. topN using window_start and window_end as partition key.
> 3. join with join condition contains window starts equality of input tables 
> and window ends equality of input tables.
> the sql content is : 
> CREATE TABLE source22 (
>     `timestamp` VARCHAR,
>     `logLevel` VARCHAR,
>     `threadName` VARCHAR,
>     `componentId` VARCHAR,
>     `stackTrace` VARCHAR,
>     `logType` VARCHAR,
>     `eventType` VARCHAR,
>     `subType` VARCHAR,
>     `operateType` VARCHAR,
>     `operateTag` VARCHAR,
>     `weight` INT,
>     `operator` VARCHAR,
>     `authRoles` VARCHAR,
>     `sourceHost` VARCHAR,
>     `restUri` VARCHAR,
>     `restMethod` VARCHAR,
>     `operateObj` VARCHAR,
>     `operateResult` VARCHAR,
>     `requestParams` VARCHAR,
>     `triggerCondition` VARCHAR,
>     `authType` VARCHAR,
>     `dataSize` INT,
>     `exceptionMsg` VARCHAR,
>     ts as TO_TIMESTAMP(`timestamp`,'-MM-dd HH:mm:ss.SSS'),
>     WATERMARK FOR  ts AS  ts - INTERVAL '10'second
> ) WITH (
>     'connector' = 'kafka',
>     'format' = 'json',
>     'properties.bootstrap.servers' = '10.192.78.27:9092',
>     'scan.startup.mode' = 'latest-offset',
>     'topic' = 'topic22',
>     'properties.group.id' = 'groupId_22'
> )
> CREATE TABLE sink22 (
>     `id` VARCHAR,
>     `rule_key` VARCHAR,
>     `rule_name` VARCHAR,
>     `metric_threshold` INT,
>     `audit_status` INT,
>     `audit_comment_num` INT,
>     `window_start` TIMESTAMP(3),
>     `window_end` TIMESTAMP(3),
>     `metric_count` BIGINT,
>     PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>     'connector' = 'elasticsearch-7',
>     'hosts' = 'http://10.192.78.27:39200',
>     'index' = 'index22'
> )
> INSERT INTO sink22
> SELECT uuid() as id ,'22' as rule_key ,'4 or condition test' as rule_name ,2 
> as metric_threshold ,0 as audit_status ,0 as audit_comment_num 
> ,window_start,window_end ,count(*) as metric_count
> FROM TABLE(TUMBLE(TABLE source22, DESCRIPTOR(ts), INTERVAL '10' Second))
> WHERE logType='operation' and (componentId='a' or componentId='b' or 
> componentId='c'  or componentId='d' )
> GROUP BY window_start,window_end
> HAVING count(*) >2
>  
> the main class in the jar is:
> public class AuditRuleJob {
>     public static void main(String[] args) {
>         final ParameterTool params = ParameterTool.fromArgs(args);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.getConfig().setGlobalJobParameters(params);
>         env.setParallelism(1);
>         
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
> 30));
>         env.enableCheckpointing(6);
>         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);                   
>       

[jira] [Assigned] (FLINK-26829) ClassCastException will be thrown when the second operand of divide is a function call

2022-04-11 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-26829:
--

Assignee: luoyuxia

> ClassCastException will be thrown  when the second operand of divide is a 
> function call
> ---
>
> Key: FLINK-26829
> URL: https://issues.apache.org/jira/browse/FLINK-26829
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>
> Can be reproduced by add the following code in 
> SqlExpressionTest#testDivideFunctions
> {code:java}
> testExpectedSqlException(
>   "1/POWER(5, 5)", divisorZeroException, classOf[ArithmeticException]) {code}
> Then the method ExpressionReducer#skipAndValidateExprs will throw the 
> exception:
> {code:java}
> java.lang.ClassCastException: org.apache.calcite.rex.RexCall cannot be cast 
> to org.apache.calcite.rex.RexLiteral  {code}
> The following code will cast the DEVIDE's second op to RexLiteral, but it 
> maybe a function call.
> {code:java}
> // according to BuiltInFunctionDefinitions, the DEVIDE's second op must be 
> numeric
> assert(RexUtil.isDeterministic(divisionLiteral))
> val divisionComparable = {
>   
> divisionLiteral.asInstanceOf[RexLiteral].getValue.asInstanceOf[Comparable[Any]]
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25242) UDF with primitive int argument does not accept int values even after a not null filter

2022-04-11 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520898#comment-17520898
 ] 

godfrey he commented on FLINK-25242:


good catch~

> UDF with primitive int argument does not accept int values even after a not 
> null filter
> ---
>
> Key: FLINK-25242
> URL: https://issues.apache.org/jira/browse/FLINK-25242
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Caizhi Weng
>Priority: Major
>
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this 
> issue.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   tEnv.executeSql("CREATE TEMPORARY FUNCTION MyUdf AS 
> 'org.apache.flink.table.api.MyUdf'")
>   tEnv.executeSql(
> """
>   |CREATE TABLE T (
>   |  a INT
>   |) WITH (
>   |  'connector' = 'values',
>   |  'bounded' = 'true'
>   |)
>   |""".stripMargin)
>   tEnv.executeSql("SELECT MyUdf(a) FROM T WHERE a IS NOT NULL").print()
> }
> {code}
> UDF code
> {code:scala}
> class MyUdf extends ScalarFunction {
>   def eval(a: Int): Int = {
> a + 1
>   }
> }
> {code}
> Exception stack
> {code}
> org.apache.flink.table.api.ValidationException: SQL validation failed. 
> Invalid function call:
> default_catalog.default_database.MyUdf(INT)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:168)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:219)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
>   at 
> org.apache.flink.table.api.TableEnvironmentITCase.myTest(TableEnvironmentITCase.scala:97)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>

[GitHub] [flink-ml] weibozhao commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator of Ftrl

2022-04-11 Thread GitBox


weibozhao commented on code in PR #83:
URL: https://github.com/apache/flink-ml/pull/83#discussion_r847916030


##
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LinearModelData.java:
##
@@ -37,60 +38,68 @@
 import java.io.OutputStream;
 
 /**
- * Model data of {@link LogisticRegressionModel}.
+ * Model data of {@link LogisticRegressionModel}, {@link FtrlModel}.
  *
  * This class also provides methods to convert model data from Table to 
Datastream, and classes
  * to save/load model data.
  */
-public class LogisticRegressionModelData {
+public class LinearModelData {
 
 public DenseVector coefficient;
+public long modelVersion;

Review Comment:
   We need  talk about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] weibozhao commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator of Ftrl

2022-04-11 Thread GitBox


weibozhao commented on code in PR #83:
URL: https://github.com/apache/flink-ml/pull/83#discussion_r847915850


##
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LinearModelData.java:
##
@@ -37,60 +38,68 @@
 import java.io.OutputStream;
 
 /**
- * Model data of {@link LogisticRegressionModel}.
+ * Model data of {@link LogisticRegressionModel}, {@link FtrlModel}.
  *
  * This class also provides methods to convert model data from Table to 
Datastream, and classes
  * to save/load model data.
  */
-public class LogisticRegressionModelData {
+public class LinearModelData {

Review Comment:
   LinearModelData is shared by ftrl, lr, svm, linearReg. For these algorithm 
has the same model data format. We need to talk about this rename. If rename, I 
agree to move this linear model data to a common place. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-26757) change the default value of state.backend.rocksdb.restore-overlap-fraction-threshold

2022-04-11 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-26757.
--
Resolution: Fixed

merged in master: 4034d3cd6d13e88e2e5ca101510bf333e94a53fa

> change the default value of 
> state.backend.rocksdb.restore-overlap-fraction-threshold
> 
>
> Key: FLINK-26757
> URL: https://issues.apache.org/jira/browse/FLINK-26757
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: 截屏2022-03-21 上午11.50.28.png
>
>
> `state.backend.rocksdb.restore-overlap-fraction-threshold` is used to control 
> how to restore a state handle, different thresholds can affect the 
> performance of restoring. The behavior of deletion in restoring has been 
> changed after FLINK-21321.
> In theory, setting the default value to 0 is most suitable, since 
> `deleteRange()` takes less time than creating a new RocksDB instance and then 
> scan-and-put the records. In fact, we also have some experimental data that 
> the default value of 0 is more suitable. Here is a comparison of 
> initialization times for different thresholds, we can see that the default 
> value to 0 takes less time.
> !截屏2022-03-21 上午11.50.28.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] Myasuka merged pull request #19324: [FLINK-26757][state] Change the default value of state.backend.rocksdb.restore-overlap-fraction-threshold to 0

2022-04-11 Thread GitBox


Myasuka merged PR #19324:
URL: https://github.com/apache/flink/pull/19324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on a diff in pull request #19413: [FLINK-16078] [docs-zh] Translate "Tuning Checkpoints and Large State…

2022-04-11 Thread GitBox


Myasuka commented on code in PR #19413:
URL: https://github.com/apache/flink/pull/19413#discussion_r847908442


##
docs/content.zh/docs/ops/state/large_state_tuning.md:
##
@@ -166,149 +125,101 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 }
 ```
 
-## Capacity Planning
-
-This section discusses how to decide how many resources should be used for a 
Flink job to run reliably.
-The basic rules of thumb for capacity planning are:
-
-  - Normal operation should have enough capacity to not operate under constant 
*back pressure*.
-See [back pressure monitoring]({{< ref "docs/ops/monitoring/back_pressure" 
>}}) for details on how to check whether the application runs under back 
pressure.
-
-  - Provision some extra resources on top of the resources needed to run the 
program back-pressure-free during failure-free time.
-These resources are needed to "catch up" with the input data that 
accumulated during the time the application
-was recovering.
-How much that should be depends on how long recovery operations usually 
take (which depends on the size of the state
-that needs to be loaded into the new TaskManagers on a failover) and how 
fast the scenario requires failures to recover.
-
-*Important*: The base line should to be established with checkpointing 
activated, because checkpointing ties up
-some amount of resources (such as network bandwidth).
-
-  - Temporary back pressure is usually okay, and an essential part of 
execution flow control during load spikes,
-during catch-up phases, or when external systems (that are written to in a 
sink) exhibit temporary slowdown.
+## 容量规划
+本节讨论如何确定 Flink 作业应该使用多少资源才能可靠地运行。
+容量规划的基本经验法则是:
+  - 正常运行应该有足够的能力在恒定的*反压*下运行。
+如何检查应用程序是否在反压下运行的详细信息,请参阅 [反压监控]({{< ref 
"docs/ops/monitoring/back_pressure" >}})。
+  - 在无故障时间内无反压程序运行所需资源之上提供一些额外的资源。
+需要这些资源来“赶上”在应用程序恢复期间积累的输入数据。
+这通常取决于恢复操作需要多长时间(这取决于需要在故障转移时加载到新 TaskManager 中的状态大小)以及故障恢复的速度。
+*重要*:基线应该在开启 checkpointing 的情况下建立,因为 checkpointing 会占用一些资源(例如网络带宽)。
+  - 临时反压通常是可以的,在负载峰值、追赶阶段或外部系统(写入接收器中)出现临时减速时,这是执行流控制的重要部分。
 
-  - Certain operations (like large windows) result in a spiky load for their 
downstream operators: 
-In the case of windows, the downstream operators may have little to do 
while the window is being built,
-and have a load to do when the windows are emitted.
-The planning for the downstream parallelism needs to take into account how 
much the windows emit and how
-fast such a spike needs to be processed.
+  - 某些操作(如大窗口)会导致其下游算子的负载激增:
+在窗口的情况下,下游算子可能在构建窗口时几乎无事可做,而在窗口发出时有负载要做。
+下游并行度的设置需要考虑到窗口输出多少以及需要以多快的速度处理这种峰值。
 
-**Important:** In order to allow for adding resources later, make sure to set 
the *maximum parallelism* of the
-data stream program to a reasonable number. The maximum parallelism defines 
how high you can set the programs
-parallelism when re-scaling the program (via a savepoint).
+**重要**:为了方便以后添加资源,请务必将数据流程序的*最大并行度*设置为合理的数字。 最大并行度定义了在重新缩放程序时(通过 savepoint 
)可以设置程序并行度的高度。
 
-Flink's internal bookkeeping tracks parallel state in the granularity of 
max-parallelism-many *key groups*.
-Flink's design strives to make it efficient to have a very high value for the 
maximum parallelism, even if
-executing the program with a low parallelism.
+Flink 的内部以多个*键组(key groups)* 的最大并行度为粒度跟踪并行状态。
+Flink 的设计力求使最大并行度的值达到很高的效率,即使执行程序时并行度很低。
 
-## Compression
-
-Flink offers optional compression (default: off) for all checkpoints and 
savepoints. Currently, compression always uses 
-the [snappy compression algorithm (version 
1.1.4)](https://github.com/xerial/snappy-java) but we are planning to support
-custom compression algorithms in the future. Compression works on the 
granularity of key-groups in keyed state, i.e.
-each key-group can be decompressed individually, which is important for 
rescaling. 
-
-Compression can be activated through the `ExecutionConfig`:
+## 压缩
+Flink 为所有 checkpoints 和 savepoints 提供可选的压缩(默认:关闭)。 目前,压缩总是使用 [snappy 压缩算法(版本 
1.1.4)](https://github.com/xerial/snappy-java),
+但我们计划在未来支持自定义压缩算法。 压缩作用于 keyed state 下 key-groups 的粒度,即每个 key-groups 
可以单独解压缩,这对于重新缩放很重要。
 
+可以通过 `ExecutionConfig` 开启压缩:
 ```java
 ExecutionConfig executionConfig = new ExecutionConfig();
 executionConfig.setUseSnapshotCompression(true);
 ```
 
-Note The compression option has no 
impact on incremental snapshots, because they are using RocksDB's internal
-format which is always using snappy compression out of the box.
-
-## Task-Local Recovery
+注意 压缩选项对增量快照没有影响,因为它们使用的是 RocksDB 
的内部格式,该格式始终使用开箱即用的 snappy 压缩。
 
-### Motivation
+## Task 本地恢复
+### 问题引入
+在 Flink 的 checkpointing 中,每个 task 都会生成其状态快照,然后将其写入分布式存储。 每个 task 
通过发送一个描述分布式存储中的位置状态的句柄,向 jobmanager 确认状态的成功写入。
+JobManager 反过来收集所有 tasks 的句柄并将它们捆绑到一个 checkpoint 对象中。
 
-In Flink's checkpointing, each task produces a snapshot of its state that is 
then written to a distributed store. Each task acknowledges
-a successful write of the state to the job 

[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847906550


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -38,6 +38,8 @@ under the License.
 在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。
 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。
 
+
+
 # 可用的 State Backends

Review Comment:
   I check this page again and find other three place with same problem, all of 
those been fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on PR #19252:
URL: https://github.com/apache/flink/pull/19252#issuecomment-1095911460

   Thank for your review again @masteryhx , I hive fixed problems you mentioned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847905580


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -306,77 +329,75 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
-## Enabling Changelog
+
 
-{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+## 开启 Changelog
 
-{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}}
 
-### Introduction
+{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}}
 
-Changelog is a feature that aims to decrease checkpointing time and, 
therefore, end-to-end latency in exactly-once mode.
+
 
-Most commonly, checkpoint duration is affected by:
+### 介绍
 
-1. Barrier travel time and alignment, addressed by
-   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
-   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
-2. Snapshot creation time (so-called synchronous phase), addressed by 
asynchronous snapshots (mentioned [above]({{<
-   ref "#the-embeddedrocksdbstatebackend">}}))
-4. Snapshot upload time (asynchronous phase)
+Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以减少 exactly-once 模式下的端到端延迟。
 
-Upload time can be decreased by [incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}).
-However, most incremental state backends perform some form of compaction 
periodically, which results in re-uploading the
-old state in addition to the new changes. In large deployments, the 
probability of at least one task uploading lots of
-data tends to be very high in every checkpoint.
+一般情况下 checkpoint 持续时间受如下因素影响:
 
-With Changelog enabled, Flink uploads state changes continuously and forms a 
changelog. On checkpoint, only the relevant
-part of this changelog needs to be uploaded. The configured state backend is 
snapshotted in the
-background periodically. Upon successful upload, the changelog is truncated.
+1. Barrier 到达和对齐时间,可以通过 [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) 
and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) 解决。

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #19400: [FLINK-25142][Connectors / Hive]Fix user-defined hive udtf initialize exception in hive dialect

2022-04-11 Thread GitBox


luoyuxia commented on PR #19400:
URL: https://github.com/apache/flink/pull/19400#issuecomment-1095903340

   Thanks @Tartarus0zm for contribution.
   It's a continue work of #17990.
   LGTM.
   @beyond1920 @wuchong Could you please help review & merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27191) Support multi kerberos-enabled Hive clusters

2022-04-11 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-27191:
-
Component/s: Connectors / Hive

> Support multi kerberos-enabled Hive clusters 
> -
>
> Key: FLINK-27191
> URL: https://issues.apache.org/jira/browse/FLINK-27191
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>
> Currently, to accesse kerberos-enabled Hive cluster, users are expected to 
> add ker/secret in flink-conf. But it can only access one Hive cluster in one 
> Flink cluster, we are also expected to support multi kerberos-enabled Hive 
> clusters  in one Flink cluster.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27191) Support multi kerberos-enabled Hive clusters

2022-04-11 Thread luoyuxia (Jira)
luoyuxia created FLINK-27191:


 Summary: Support multi kerberos-enabled Hive clusters 
 Key: FLINK-27191
 URL: https://issues.apache.org/jira/browse/FLINK-27191
 Project: Flink
  Issue Type: Improvement
Reporter: luoyuxia
 Fix For: 1.16.0


Currently, to accesse kerberos-enabled Hive cluster, users are expected to add 
ker/secret in flink-conf. But it can only access one Hive cluster in one Flink 
cluster, we are also expected to support multi kerberos-enabled Hive clusters  
in one Flink cluster.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] luoyuxia commented on pull request #19423: [FLINK-27175][hive] Support to call Hive UDAF when the UDAF accepts one parameter with array type

2022-04-11 Thread GitBox


luoyuxia commented on PR #19423:
URL: https://github.com/apache/flink/pull/19423#issuecomment-1095873524

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19429: [hotfix][docs] Fix the "capacity-planning" page of "Tuning Checkpoint…

2022-04-11 Thread GitBox


flinkbot commented on PR #19429:
URL: https://github.com/apache/flink/pull/19429#issuecomment-1095867338

   
   ## CI report:
   
   * 9e092e3dfca1ea6effdf2edd26acdf9c590d5ac6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wenlong88 commented on pull request #19179: [FLINK-26756][table-planner] Fix the deserialization error for match recognize

2022-04-11 Thread GitBox


wenlong88 commented on PR #19179:
URL: https://github.com/apache/flink/pull/19179#issuecomment-1095863004

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuzhuang2017 opened a new pull request, #19429: [hotfix][docs] Fix the "capacity-planning" page of "Tuning Checkpoint…

2022-04-11 Thread GitBox


liuzhuang2017 opened a new pull request, #19429:
URL: https://github.com/apache/flink/pull/19429

   Fix the "capacity-planning" page of "Tuning Checkpoints and Large State"
   
   
   
   ## What is the purpose of the change
   
   Fix the "capacity-planning" page of "Tuning Checkpoints and Large State"
   
   
   ## Brief change log
   
   Fix the "capacity-planning" page of "Tuning Checkpoints and Large State"
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #87: [FLINK-27123] Introduce filter predicate for 'LIKE'

2022-04-11 Thread GitBox


LadyForest commented on code in PR #87:
URL: https://github.com/apache/flink-table-store/pull/87#discussion_r847890543


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java:
##
@@ -95,9 +102,46 @@ public Predicate visit(CallExpression call) {
 .map(FieldReferenceExpression::getFieldIndex)
 .map(IsNotNull::new)
 .orElseThrow(UnsupportedExpression::new);
+} else if (func == BuiltInFunctionDefinitions.LIKE) {
+FieldReferenceExpression fieldRefExpr =
+
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
+if (fieldRefExpr.getOutputDataType().getLogicalType().getTypeRoot()
+== LogicalTypeRoot.VARCHAR) {
+String sqlPattern =
+extractLiteral(fieldRefExpr.getOutputDataType(), 
children.get(1))
+.orElseThrow(UnsupportedExpression::new)
+.value()
+.toString();
+String escape =
+children.size() <= 2
+? null
+: 
extractLiteral(fieldRefExpr.getOutputDataType(), children.get(2))
+
.orElseThrow(UnsupportedExpression::new)
+.value()
+.toString();
+if (escape == null) {
+Matcher matcher = BEGIN_PATTERN.matcher(sqlPattern);
+if (matcher.matches()) {
+return new StartsWith(
+fieldRefExpr.getFieldIndex(),
+matcher.group(1),
+sqlPattern.endsWith("_"));

Review Comment:
   > `sqlPattern.endsWith("_")` what it is mean?
   
   match one or more characters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847887144


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -306,77 +329,75 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
-## Enabling Changelog
+
 
-{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+## 开启 Changelog
 
-{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}}
 
-### Introduction
+{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}}

Review Comment:
   Got it , thanks, I will changed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847885936


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -38,6 +38,8 @@ under the License.
 在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。
 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。
 
+
+
 # 可用的 State Backends

Review Comment:
   Got it ! Thanks for your patience in explaining @masteryhx , I will fix it 
later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


masteryhx commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847882156


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -306,77 +329,75 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
-## Enabling Changelog
+
 
-{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+## 开启 Changelog
 
-{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}}
 
-### Introduction
+{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}}

Review Comment:
   I think it could be changed into "您" in this place where the sentence is a 
warning as you could see in the [Flink Translation 
Specifications](https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


masteryhx commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847877301


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -38,6 +38,8 @@ under the License.
 在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。
 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。
 
+
+
 # 可用的 State Backends

Review Comment:
   The directory shown in right is inconsistent, you could check it running the 
doc server.
   We need to replace "#" with "##" so the directory could be shown correctly.
   I think "选择合适的 State Backend" and "RocksDB State Backend 进阶" have similar 
problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27018) timestamp missing end zero when outputing to kafka

2022-04-11 Thread Yao Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520882#comment-17520882
 ] 

Yao Zhang commented on FLINK-27018:
---

Hi [~fpaul] and [~jeff-zou] ,

I reproduced this issue with Flink 1.14.2. After debugging I found this problem 
lies in SQL_TIMESTAMP_FORMAT.

Could you please assign this issue to me?

> timestamp missing end  zero when outputing to kafka
> ---
>
> Key: FLINK-27018
> URL: https://issues.apache.org/jira/browse/FLINK-27018
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.5
>Reporter: jeff-zou
>Priority: Major
> Attachments: kafka.png
>
>
> the bug is described as follows:
>  
> {code:java}
> data in source:
>  2022-04-02 03:34:21.260
> but after sink by sql, data in kafka:
>  2022-04-02 03:34:21.26
> {code}
>  
> data miss end zero in kafka.
>  
> sql:
> {code:java}
> create kafka_table(stime stimestamp) with ('connector'='kafka','format' = 
> 'json');
> insert into kafka_table select stime from (values(timestamp '2022-04-02 
> 03:34:21.260')){code}
> the value in kafka is : \{"stime":"2022-04-02 03:34:21.26"}, missed end zero.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #87: [FLINK-27123] Introduce filter predicate for 'LIKE'

2022-04-11 Thread GitBox


JingsongLi commented on code in PR #87:
URL: https://github.com/apache/flink-table-store/pull/87#discussion_r847865264


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java:
##
@@ -95,9 +102,46 @@ public Predicate visit(CallExpression call) {
 .map(FieldReferenceExpression::getFieldIndex)
 .map(IsNotNull::new)
 .orElseThrow(UnsupportedExpression::new);
+} else if (func == BuiltInFunctionDefinitions.LIKE) {
+FieldReferenceExpression fieldRefExpr =
+
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
+if (fieldRefExpr.getOutputDataType().getLogicalType().getTypeRoot()
+== LogicalTypeRoot.VARCHAR) {

Review Comment:
   Family is `LogicalTypeFamily.CHARACTER_STRING`.
   Char also is OK.



##
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java:
##
@@ -1224,6 +1226,144 @@ public void testQueryContainsDefaultFieldName() throws 
Exception {
 .containsOnly(changelogRow("+I", 1, "abc"));
 }
 
+@Test
+public void testLike() throws Exception {
+rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+tEnv = StreamTableEnvironment.create(buildBatchEnv(), 
EnvironmentSettings.inBatchMode());
+List input =
+Arrays.asList(
+changelogRow("+I", 1, "test_1"),
+changelogRow("+I", 2, "test_2"),
+changelogRow("+I", 1, "test_%"),
+changelogRow("+I", 2, "test%2"),
+changelogRow("+I", 3, "university"),
+changelogRow("+I", 4, "very"),
+changelogRow("+I", 5, "yield"));
+String id = registerData(input);
+UUID randomSourceId = UUID.randomUUID();
+tEnv.executeSql(
+String.format(
+"create table `helper_source_%s` (f0 int, f1 string) 
with ("
++ "'connector' = 'values', "
++ "'bounded' = 'true', "
++ "'data-id' = '%s')",
+randomSourceId, id));
+
+UUID randomSinkId = UUID.randomUUID();
+tEnv.executeSql(
+String.format(
+"create table `managed_table_%s` with ("
++ "'%s' = '%s'"
++ ") like `helper_source_%s` "
++ "(excluding options)",
+randomSinkId, FileStoreOptions.PATH.key(), rootPath, 
randomSourceId));
+tEnv.executeSql(

Review Comment:
   To test the filter, the best way is to insert it multiple times so that 
multiple files are generated to achieve the effect we want to filter the files.



##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java:
##
@@ -95,9 +102,46 @@ public Predicate visit(CallExpression call) {
 .map(FieldReferenceExpression::getFieldIndex)
 .map(IsNotNull::new)
 .orElseThrow(UnsupportedExpression::new);
+} else if (func == BuiltInFunctionDefinitions.LIKE) {
+FieldReferenceExpression fieldRefExpr =
+
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
+if (fieldRefExpr.getOutputDataType().getLogicalType().getTypeRoot()
+== LogicalTypeRoot.VARCHAR) {
+String sqlPattern =
+extractLiteral(fieldRefExpr.getOutputDataType(), 
children.get(1))
+.orElseThrow(UnsupportedExpression::new)
+.value()
+.toString();
+String escape =
+children.size() <= 2
+? null
+: 
extractLiteral(fieldRefExpr.getOutputDataType(), children.get(2))
+
.orElseThrow(UnsupportedExpression::new)
+.value()
+.toString();
+if (escape == null) {
+Matcher matcher = BEGIN_PATTERN.matcher(sqlPattern);
+if (matcher.matches()) {
+return new StartsWith(
+fieldRefExpr.getFieldIndex(),
+matcher.group(1),
+sqlPattern.endsWith("_"));

Review Comment:
   `sqlPattern.endsWith("_")` what it is mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL 

[GitHub] [flink] beyond1920 closed pull request #19109: [FLINK-26681][hive] Support sql statement end with ";" for Hive dialect query

2022-04-11 Thread GitBox


beyond1920 closed pull request #19109: [FLINK-26681][hive] Support sql 
statement end with ";" for Hive dialect query
URL: https://github.com/apache/flink/pull/19109


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator of Ftrl

2022-04-11 Thread GitBox


yunfengzhou-hub commented on code in PR #83:
URL: https://github.com/apache/flink-ml/pull/83#discussion_r847844186


##
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/ftrl/Ftrl.java:
##
@@ -0,0 +1,395 @@
+/*

Review Comment:
   Jira tickets for algorithms that are supposed to be added have all been 
created in advance. You can find the ticket for FTRL on 
https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=541. The ticket 
for FTRL is FLINK-20790.



##
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/FtrlTest.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.classification.ftrl.Ftrl;
+import org.apache.flink.ml.classification.ftrl.FtrlModel;
+import 
org.apache.flink.ml.classification.logisticregression.LogisticRegression;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+
+/** Tests {@link Ftrl} and {@link FtrlModel}. */
+public class FtrlTest extends AbstractTestBase {

Review Comment:
   The test cases are arranged differently from existing practice. Let's add 
tests that each covers the following situations.
   - tests getting/setting parameters
   - tests the most common fit/transform process.
   - tests save/load.
   - tests getting/setting model data.
   - tests invalid inputs/corner cases.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LinearModelData.java:
##
@@ -37,60 +38,68 @@
 import java.io.OutputStream;
 
 /**
- * Model data of {@link LogisticRegressionModel}.
+ * Model data of {@link LogisticRegressionModel}, {@link FtrlModel}.
  *
  * This class also provides methods to convert model data from Table to 
Datastream, and classes
  * to save/load model data.
  */
-public class LogisticRegressionModelData {
+public class LinearModelData {

Review Comment:
   Could you please illustrate the relationship between FTRL and 
LogisticRegression, and other algorithms like LinearRegression? I'm not sure 
why we would like to rename `LogisticRegressionModelData` as `LinearModelData`.
   
   If after discussion we still agree that this renaming is reasonable, it 
would mean that the model data class neither belongs `logisticregresson` or 
`ftrl` package. We would need to place classes like this to a neutral package, 
like something named `common`.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LinearModelData.java:
##
@@ -37,60 +38,68 @@
 import java.io.OutputStream;
 
 /**
- * Model data of {@link LogisticRegressionModel}.
+ * Model data of {@link LogisticRegressionModel}, {@link FtrlModel}.
  *
  * This class also provides methods to convert model data from Table to 
Datastream, and 

[jira] [Commented] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is

2022-04-11 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520859#comment-17520859
 ] 

Feifan Wang commented on FLINK-27187:
-

I think it's ok. Should I open a new ticket do this work ? Or update this 
ticket directly ?

> The attemptsPerUpload metric may be lower than it actually is
> -
>
> Key: FLINK-27187
> URL: https://issues.apache.org/jira/browse/FLINK-27187
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Minor
>
> The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate 
> distributions of number of attempts per upload.
> In the current implementation, each successful attempt try to update 
> attemptsPerUpload with its attemptNumber.
> But consider this case: 
>  # attempt 1 timeout, then schedule attempt 2
>  # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1
> In fact there are two attempts, but attemptsPerUpload updated with 1.
> So, I think we should add "actionAttemptsCount" to 
> RetryExecutor.RetriableActionAttempt, this field shared across all attempts 
> to execute the same upload action representing the number of upload attempts. 
> And completed attempt should use this field update attemptsPerUpload.
>  
> How do you think about ? [~ym] , [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is

2022-04-11 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520845#comment-17520845
 ] 

Roman Khachatryan commented on FLINK-27187:
---

How about "totalAttemptsPerUpload"?

> The attemptsPerUpload metric may be lower than it actually is
> -
>
> Key: FLINK-27187
> URL: https://issues.apache.org/jira/browse/FLINK-27187
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Minor
>
> The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate 
> distributions of number of attempts per upload.
> In the current implementation, each successful attempt try to update 
> attemptsPerUpload with its attemptNumber.
> But consider this case: 
>  # attempt 1 timeout, then schedule attempt 2
>  # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1
> In fact there are two attempts, but attemptsPerUpload updated with 1.
> So, I think we should add "actionAttemptsCount" to 
> RetryExecutor.RetriableActionAttempt, this field shared across all attempts 
> to execute the same upload action representing the number of upload attempts. 
> And completed attempt should use this field update attemptsPerUpload.
>  
> How do you think about ? [~ym] , [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is

2022-04-11 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520836#comment-17520836
 ] 

Feifan Wang commented on FLINK-27187:
-

[~roman] , how would you like to name the metric of "the total number of 
attempts" ?

> The attemptsPerUpload metric may be lower than it actually is
> -
>
> Key: FLINK-27187
> URL: https://issues.apache.org/jira/browse/FLINK-27187
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Minor
>
> The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate 
> distributions of number of attempts per upload.
> In the current implementation, each successful attempt try to update 
> attemptsPerUpload with its attemptNumber.
> But consider this case: 
>  # attempt 1 timeout, then schedule attempt 2
>  # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1
> In fact there are two attempts, but attemptsPerUpload updated with 1.
> So, I think we should add "actionAttemptsCount" to 
> RetryExecutor.RetriableActionAttempt, this field shared across all attempts 
> to execute the same upload action representing the number of upload attempts. 
> And completed attempt should use this field update attemptsPerUpload.
>  
> How do you think about ? [~ym] , [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is

2022-04-11 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520833#comment-17520833
 ] 

Roman Khachatryan commented on FLINK-27187:
---

The number of the successfull attempt is actually the intended meaning of the 
metric.

But a metric for the total number of attempts can also be added.

> The attemptsPerUpload metric may be lower than it actually is
> -
>
> Key: FLINK-27187
> URL: https://issues.apache.org/jira/browse/FLINK-27187
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Minor
>
> The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate 
> distributions of number of attempts per upload.
> In the current implementation, each successful attempt try to update 
> attemptsPerUpload with its attemptNumber.
> But consider this case: 
>  # attempt 1 timeout, then schedule attempt 2
>  # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1
> In fact there are two attempts, but attemptsPerUpload updated with 1.
> So, I think we should add "actionAttemptsCount" to 
> RetryExecutor.RetriableActionAttempt, this field shared across all attempts 
> to execute the same upload action representing the number of upload attempts. 
> And completed attempt should use this field update attemptsPerUpload.
>  
> How do you think about ? [~ym] , [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint

2022-04-11 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520830#comment-17520830
 ] 

Roman Khachatryan commented on FLINK-27132:
---

Sorry, I mean the 5th item.

And yes, I mean ZK or K8s or any other handle store.

 

> AFAIU, the {{SharedStateRegistry}} only loads the data from 
> {{{}CompletedCheckpointStore{}}}, i.e. if the data is not present through 
> these checkpoints, it won't be discarded.
 
Yes, but the (new) checkpoints that are still present there, might still refer 
to the original files under shared/ folder; and these files therefore might be 
discarded.

> CheckpointResourcesCleanupRunner might discard shared state of the initial 
> checkpoint
> -
>
> Key: FLINK-27132
> URL: https://issues.apache.org/jira/browse/FLINK-27132
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Roman Khachatryan
>Priority: Major
>
> When considering the following case: # A job starts from a checkpoint in 
> NO_CLAIM mode, with incremental checkpoints enabled
>  # It produces some new checkpoints and subsumes the original one (not 
> discarding shared state - before FLINK-24611 or after FLINK-26985)
>  # Job terminates abruptly
>  # The cleaner is started for that job
>  # ZK doesn't have the initial checkpoint, so the store will load only the 
> new checkpoints (created in 2). Shared state is registered
>  # The store is shut down - discarding all the checkpoints and also any 
> shared state
> In -6- 5, if some checkpoint uses the initial state, it will also be discarded
>  
> [~mapohl] could you please confirm this?
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22

2022-04-11 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-26070:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Update dependency numpy to >=1.21.0,<1.22
> -
>
> Key: FLINK-26070
> URL: https://issues.apache.org/jira/browse/FLINK-26070
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Python
>Reporter: Martijn Visser
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> PyFlink currently sets the required numpy version as >=1.14.3,<1.2
> We should update this to >=1.21.0,<1.22
> Updating numpy will also resolve being marked as vulnerable for 
> https://nvd.nist.gov/vuln/detail/CVE-2021-33430



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25962) Flink generated Avro schemas can't be parsed using Python

2022-04-11 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-25962:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Flink generated Avro schemas can't be parsed using Python
> -
>
> Key: FLINK-25962
> URL: https://issues.apache.org/jira/browse/FLINK-25962
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.3
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> Flink currently generates Avro schemas as records with the top-level name 
> {{"record"}}
> Unfortunately, there is some inconsistency between Avro implementations in 
> different languages that may prevent this record from being read, notably 
> Python, which generates the error:
> *avro.schema.SchemaParseException: record is a reserved type name*
> (See the comment on FLINK-18096 for the full stack trace).
> The Java SDK accepts this name, and there's an [ongoing 
> discussion|https://lists.apache.org/thread/0wmgyx6z69gy07lvj9ndko75752b8cn2] 
> about what the expected behaviour should be.  This should be clarified and 
> fixed in Avro, of course.
> Regardless of the resolution, the best practice (which is used almost 
> everywhere else in the Flink codebase) is to explicitly specify a top-level 
> namespace for an Avro record.   We should use a default like: 
> {{{}org.apache.flink.avro.generated{}}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-21383) Docker image does not play well together with ConfigMap based flink-conf.yamls

2022-04-11 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-21383:
---
  Labels: auto-deprioritized-major usability  (was: stale-major usability)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Docker image does not play well together with ConfigMap based flink-conf.yamls
> --
>
> Key: FLINK-21383
> URL: https://issues.apache.org/jira/browse/FLINK-21383
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes, flink-docker
>Affects Versions: 1.11.6, 1.12.7, 1.13.5, 1.14.3
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: auto-deprioritized-major, usability
>
> Flink's Docker image does not play well together with ConfigMap based 
> flink-conf.yamls. The {{docker-entrypoint.sh}} script offers a few env 
> variables to overwrite configuration values (e.g. {{FLINK_PROPERTIES}}, 
> {{JOB_MANAGER_RPC_ADDRESS}}, etc.). The problem is that the entrypoint script 
> assumes that it can modify the existing {{flink-conf.yaml}}. This is not the 
> case if the {{flink-conf.yaml}} is based on a {{ConfigMap}}.
> Making things worse, failures updating the {{flink-conf.yaml}} are not 
> reported. Moreover, the called {{jobmanager.sh}} and {{taskmanager.sh}} 
> scripts don't support to pass in dynamic configuration properties into the 
> processes.
> I think the problem is that our assumption that we can modify the 
> {{flink-conf.yaml}} does not always hold true. If we updated the final 
> configuration from within the Flink process (dynamic properties and env 
> variables), then this problem could be avoided.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint

2022-04-11 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-27132:
--
Description: 
When considering the following case: # A job starts from a checkpoint in 
NO_CLAIM mode, with incremental checkpoints enabled
 # It produces some new checkpoints and subsumes the original one (not 
discarding shared state - before FLINK-24611 or after FLINK-26985)
 # Job terminates abruptly
 # The cleaner is started for that job
 # ZK doesn't have the initial checkpoint, so the store will load only the new 
checkpoints (created in 2). Shared state is registered
 # The store is shut down - discarding all the checkpoints and also any shared 
state

In -6- 5, if some checkpoint uses the initial state, it will also be discarded
 
[~mapohl] could you please confirm this?
 
cc: [~yunta]

  was:
When considering the following case: # A job starts from a checkpoint in 
NO_CLAIM mode, with incremental checkpoints enabled
 # It produces some new checkpoints and subsumes the original one (not 
discarding shared state - before FLINK-24611 or after FLINK-26985)
 # Job terminates abruptly
 # The cleaner is started for that job
 # ZK doesn't have the initial checkpoint, so the store will load only the new 
checkpoints (created in 2). Shared state is registered
 # The store is shut down - discarding all the checkpoints and also any shared 
state

 
In -6-5, if some checkpoint uses the initial state, it will also be discarded
 
[~mapohl] could you please confirm this?
 
cc: [~yunta]


> CheckpointResourcesCleanupRunner might discard shared state of the initial 
> checkpoint
> -
>
> Key: FLINK-27132
> URL: https://issues.apache.org/jira/browse/FLINK-27132
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Roman Khachatryan
>Priority: Major
>
> When considering the following case: # A job starts from a checkpoint in 
> NO_CLAIM mode, with incremental checkpoints enabled
>  # It produces some new checkpoints and subsumes the original one (not 
> discarding shared state - before FLINK-24611 or after FLINK-26985)
>  # Job terminates abruptly
>  # The cleaner is started for that job
>  # ZK doesn't have the initial checkpoint, so the store will load only the 
> new checkpoints (created in 2). Shared state is registered
>  # The store is shut down - discarding all the checkpoints and also any 
> shared state
> In -6- 5, if some checkpoint uses the initial state, it will also be discarded
>  
> [~mapohl] could you please confirm this?
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint

2022-04-11 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-27132:
--
Description: 
When considering the following case: # A job starts from a checkpoint in 
NO_CLAIM mode, with incremental checkpoints enabled
 # It produces some new checkpoints and subsumes the original one (not 
discarding shared state - before FLINK-24611 or after FLINK-26985)
 # Job terminates abruptly
 # The cleaner is started for that job
 # ZK doesn't have the initial checkpoint, so the store will load only the new 
checkpoints (created in 2). Shared state is registered
 # The store is shut down - discarding all the checkpoints and also any shared 
state

 
In 5, if some checkpoint uses the initial state, it will also be discarded
 
[~mapohl] could you please confirm this?
 
cc: [~yunta]

  was:
When considering the following case: # A job starts from a checkpoint in 
NO_CLAIM mode, with incremental checkpoints enabled
 # It produces some new checkpoints and subsumes the original one (not 
discarding shared state - before FLINK-24611 or after FLINK-26985)
 # Job terminates abruptly
 # The cleaner is started for that job
 # ZK doesn't have the initial checkpoint, so the store will load only the new 
checkpoints (created in 2). Shared state is registered
 # The store is shut down - discarding all the checkpoints and also any shared 
state

 
In 6, if some checkpoint uses the initial state, it will also be discarded
 
[~mapohl] could you please confirm this?
 
cc: [~yunta]


> CheckpointResourcesCleanupRunner might discard shared state of the initial 
> checkpoint
> -
>
> Key: FLINK-27132
> URL: https://issues.apache.org/jira/browse/FLINK-27132
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Roman Khachatryan
>Priority: Major
>
> When considering the following case: # A job starts from a checkpoint in 
> NO_CLAIM mode, with incremental checkpoints enabled
>  # It produces some new checkpoints and subsumes the original one (not 
> discarding shared state - before FLINK-24611 or after FLINK-26985)
>  # Job terminates abruptly
>  # The cleaner is started for that job
>  # ZK doesn't have the initial checkpoint, so the store will load only the 
> new checkpoints (created in 2). Shared state is registered
>  # The store is shut down - discarding all the checkpoints and also any 
> shared state
>  
> In 5, if some checkpoint uses the initial state, it will also be discarded
>  
> [~mapohl] could you please confirm this?
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint

2022-04-11 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-27132:
--
Description: 
When considering the following case: # A job starts from a checkpoint in 
NO_CLAIM mode, with incremental checkpoints enabled
 # It produces some new checkpoints and subsumes the original one (not 
discarding shared state - before FLINK-24611 or after FLINK-26985)
 # Job terminates abruptly
 # The cleaner is started for that job
 # ZK doesn't have the initial checkpoint, so the store will load only the new 
checkpoints (created in 2). Shared state is registered
 # The store is shut down - discarding all the checkpoints and also any shared 
state

 
In -6-5, if some checkpoint uses the initial state, it will also be discarded
 
[~mapohl] could you please confirm this?
 
cc: [~yunta]

  was:
When considering the following case: # A job starts from a checkpoint in 
NO_CLAIM mode, with incremental checkpoints enabled
 # It produces some new checkpoints and subsumes the original one (not 
discarding shared state - before FLINK-24611 or after FLINK-26985)
 # Job terminates abruptly
 # The cleaner is started for that job
 # ZK doesn't have the initial checkpoint, so the store will load only the new 
checkpoints (created in 2). Shared state is registered
 # The store is shut down - discarding all the checkpoints and also any shared 
state

 
In 5, if some checkpoint uses the initial state, it will also be discarded
 
[~mapohl] could you please confirm this?
 
cc: [~yunta]


> CheckpointResourcesCleanupRunner might discard shared state of the initial 
> checkpoint
> -
>
> Key: FLINK-27132
> URL: https://issues.apache.org/jira/browse/FLINK-27132
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Roman Khachatryan
>Priority: Major
>
> When considering the following case: # A job starts from a checkpoint in 
> NO_CLAIM mode, with incremental checkpoints enabled
>  # It produces some new checkpoints and subsumes the original one (not 
> discarding shared state - before FLINK-24611 or after FLINK-26985)
>  # Job terminates abruptly
>  # The cleaner is started for that job
>  # ZK doesn't have the initial checkpoint, so the store will load only the 
> new checkpoints (created in 2). Shared state is registered
>  # The store is shut down - discarding all the checkpoints and also any 
> shared state
>  
> In -6-5, if some checkpoint uses the initial state, it will also be discarded
>  
> [~mapohl] could you please confirm this?
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26811) Document CRD upgrade process

2022-04-11 Thread Ted Chang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520813#comment-17520813
 ] 

Ted Chang commented on FLINK-26811:
---

[~wangyang0918] good reference. I will add that.

> Document CRD upgrade process
> 
>
> Key: FLINK-26811
> URL: https://issues.apache.org/jira/browse/FLINK-26811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Ted Chang
>Priority: Major
> Fix For: kubernetes-operator-0.1.0
>
>
> We need to document how to update the CRD with a newer version. During 
> development, we delete the old CRD and create it from scratch. In an 
> environment with existing deployments that isn't possible, as deleting the 
> CRD would wipe out all existing CRs.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-27101) Periodically break the chain of incremental checkpoint

2022-04-11 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520811#comment-17520811
 ] 

Steven Zhen Wu commented on FLINK-27101:


[~yunta] a pluggable checkpoint trigger policy might work well here. If I 
understand you correctly, you are saying we can switch the triggered checkpiont 
from incremental to full in a customized policy based on some cron conditions.

triggering a savpoint is not a concern. it can be scheduled by the control 
plane. If we want to use the savepoint to break the incremental chain, we would 
need to redeploy the job from the savepoint. the redeployment could feel like 
hacky.

> Periodically break the chain of incremental checkpoint
> --
>
> Key: FLINK-27101
> URL: https://issues.apache.org/jira/browse/FLINK-27101
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Steven Zhen Wu
>Priority: Major
>
> Incremental checkpoint is almost a must for large-state jobs. It greatly 
> reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
> implications from incremental checkpoint that are problematic for production 
> operations.  Will use S3 as an example DFS for the rest of description.
> 1. Because there is no way to deterministically know how far back the 
> incremental checkpoint can refer to files uploaded to S3, it is very 
> difficult to set S3 bucket/object TTL. In one application, we have observed 
> Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can 
> corrupt the Flink checkpoints.
> S3 TTL is important for a few reasons
> - purge orphaned files (like external checkpoints from previous deployments) 
> to keep the storage cost in check. This problem can be addressed by 
> implementing proper garbage collection (similar to JVM) by traversing the 
> retained checkpoints from all jobs and traverse the file references. But that 
> is an expensive solution from engineering cost perspective.
> - Security and privacy. E.g., there may be requirement that Flink state can't 
> keep the data for more than some duration threshold (hours/days/weeks). 
> Application is expected to purge keys to satisfy the requirement. However, 
> with incremental checkpoint and how deletion works in RocksDB, it is hard to 
> set S3 TTL to purge S3 files. Even though those old S3 files don't contain 
> live keys, they may still be referrenced by retained Flink checkpoints.
> 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a 
> result, restoring from checkpoint failed. With incremental checkpoint, it 
> usually doesn't help to try other older checkpoints, because they may refer 
> to the same corrupted file. It is unclear whether the corruption happened 
> before or during S3 upload. This risk can be mitigated with periodical 
> savepoints.
> It all boils down to periodical full snapshot (checkpoint or savepoint) to 
> deterministically break the chain of incremental checkpoints. Search the jira 
> history, the behavior that FLINK-23949 [1] trying to fix is actually close to 
> what we would need here.
> There are a few options
> 1. Periodically trigger savepoints (via control plane). This is actually not 
> a bad practice and might be appealing to some people. The problem is that it 
> requires a job deployment to break the chain of incremental checkpoint. 
> periodical job deployment may sound hacky. If we make the behavior of full 
> checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be 
> an acceptable compromise. The benefit is that no job deployment is required 
> after savepoints.
> 2. Build the feature in Flink incremental checkpoint. Periodically (with some 
> cron style config) trigger a full checkpoint to break the incremental chain. 
> If the full checkpoint failed (due to whatever reason), the following 
> checkpoints should attempt full checkpoint as well until one successful full 
> checkpoint is completed.
> 3. For the security/privacy requirement, the main thing is to apply 
> compaction on the deleted keys. That could probably avoid references to the 
> old files. Is there any RocksDB compation can achieve full compaction of 
> removing old delete markers. Recent delete markers are fine
> [1] https://issues.apache.org/jira/browse/FLINK-23949



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27190) Revisit error handling in main reconcile() loop

2022-04-11 Thread Matyas Orhidi (Jira)
Matyas Orhidi created FLINK-27190:
-

 Summary: Revisit error handling in main reconcile() loop
 Key: FLINK-27190
 URL: https://issues.apache.org/jira/browse/FLINK-27190
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Matyas Orhidi
 Fix For: kubernetes-operator-1.0.0


The are some improvements introduced around error handling:
 * [https://github.com/java-operator-sdk/java-operator-sdk/pull/1033]

 in the upcoming java-operator-sdk release 
[v3.0.0.RC1.|https://github.com/java-operator-sdk/java-operator-sdk/releases/tag]
 We should revisit and simplify further the error logic in 
{{FlinkDeploymentController.reconcile()}}

{{Currently}}
 * checked exceptions are wrapped in runtime exceptions
 * validation errors are terminal errors but handled with differently

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot commented on pull request #19428: [FLINK-25694] [file-system] Ugrade Presto

2022-04-11 Thread GitBox


flinkbot commented on PR #19428:
URL: https://github.com/apache/flink/pull/19428#issuecomment-1095522423

   
   ## CI report:
   
   * 8cb988ee3e60ba740733ece9b96bde1c86d5bef4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25694) GSON/Alluxio Vulnerability

2022-04-11 Thread David Perkins (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520798#comment-17520798
 ] 

David Perkins commented on FLINK-25694:
---

A new version was finally released. Created a PR with the update: 
https://github.com/apache/flink/pull/19428

> GSON/Alluxio Vulnerability
> --
>
> Key: FLINK-25694
> URL: https://issues.apache.org/jira/browse/FLINK-25694
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.14.2
>Reporter: David Perkins
>Priority: Major
>  Labels: pull-request-available
>
> GSON has a bug, which was fixed in 2.8.9, see 
> [https://github.com/google/gson/pull/1991|https://github.com/google/gson/pull/1991.]
>  This results in the possibility for DOS attacks.
> GSON is included in the `flink-s3-fs-presto` plugin, because Alluxio includes 
> it in their shaded client. I've opened an issue in Alluxio: 
> [https://github.com/Alluxio/alluxio/issues/14868]. When that is fixed, the 
> plugin also needs to be updated.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25694) GSON/Alluxio Vulnerability

2022-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-25694:
---
Labels: pull-request-available  (was: )

> GSON/Alluxio Vulnerability
> --
>
> Key: FLINK-25694
> URL: https://issues.apache.org/jira/browse/FLINK-25694
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.14.2
>Reporter: David Perkins
>Priority: Major
>  Labels: pull-request-available
>
> GSON has a bug, which was fixed in 2.8.9, see 
> [https://github.com/google/gson/pull/1991|https://github.com/google/gson/pull/1991.]
>  This results in the possibility for DOS attacks.
> GSON is included in the `flink-s3-fs-presto` plugin, because Alluxio includes 
> it in their shaded client. I've opened an issue in Alluxio: 
> [https://github.com/Alluxio/alluxio/issues/14868]. When that is fixed, the 
> plugin also needs to be updated.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] David-N-Perkins opened a new pull request, #19428: [FLINK-25694] [file-system] Ugrade Presto

2022-04-11 Thread GitBox


David-N-Perkins opened a new pull request, #19428:
URL: https://github.com/apache/flink/pull/19428

   ## What is the purpose of the change
   * Updated presto to the latest version due to GSON bug
   
   ## Brief change log
   - Updated prosto library to `.272`
   
   ## Verifying this change
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: yes 
   
   ## Documentation
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

2022-04-11 Thread GitBox


flinkbot commented on PR #19427:
URL: https://github.com/apache/flink/pull/19427#issuecomment-1095484285

   
   ## CI report:
   
   * 05707cf8955f190d65021d61c5afd8164e831315 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27140) Move JobResultStore dirty entry creation into ioExecutor

2022-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27140:
---
Labels: pull-request-available  (was: )

> Move JobResultStore dirty entry creation into ioExecutor
> 
>
> Key: FLINK-27140
> URL: https://issues.apache.org/jira/browse/FLINK-27140
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Matthias Pohl
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> The {{FileSystemJobResultStore}} is thread-safe and, therefore, we can move 
> the dirty entry creation into the {{ioExecutor}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] zentol opened a new pull request, #19427: [FLINK-27140][coordination] Write job result in ioExecutor

2022-04-11 Thread GitBox


zentol opened a new pull request, #19427:
URL: https://github.com/apache/flink/pull/19427

   Prevents the main thread from doing a potentially long running operation, 
moving it instead to the io executor.
   
   Needed to adjust some tests to rely on the runner termination future instead 
of the job result future, as the latter does not guarantee that the runner is 
cleaned up, which is required for these tests to pass because the mocked 
JMRunner doesn't implement all required methods (and if the runner is 
terminated then the dispatcher handles everything).
   This was previously fine because the job execution, job termination and 
runner termination happened synchronously by virtue of all futures already 
being completed and us never switching executors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26140) Add basic handling mechanism to deal with job upgrade errors

2022-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-26140:
---
Labels: pull-request-available  (was: )

> Add basic handling mechanism to deal with job upgrade errors
> 
>
> Key: FLINK-26140
> URL: https://issues.apache.org/jira/browse/FLINK-26140
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> There are various different ways how a stateful job upgrade can fail.
> For example:
> - Failure/timeout during savepoint
> - Incompatible state
> - Corrupted / not-found checkpoint
> - Error after restart
> We should allow some strategies for the user to declare how to handle the 
> different error scenarios (such as roll back to earlier state) and what 
> should be treated as a fatal error.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #165: [FLINK-26140] Support rollback strategies

2022-04-11 Thread GitBox


gyfora commented on PR #165:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/165#issuecomment-1095479699

   Opening this for review/discussion, I will work on adding tests for the core 
logic.
   
   cc @wangyang0918 @tweise @Aitozi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27189) Ability for table api to always add quotes to generated csv

2022-04-11 Thread shameet (Jira)
shameet created FLINK-27189:
---

 Summary: Ability for table api to always add quotes to generated 
csv
 Key: FLINK-27189
 URL: https://issues.apache.org/jira/browse/FLINK-27189
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: shameet


I am using the table api in pyflink to generate a csv . What i noticed is that 
its conditionally adding quotes around the data. What I want is quotes around 
all the data
csv is being created in s3
 
 
e.g in output below the data in last column was not quoted
"[transaction_idgeorge.bl...@reqres.in|mailto:transaction_idgeorge.bl...@reqres.in]",card_hash,transaction_id
"[no3500957594177george.bl...@reqres.in|mailto:no3500957594177george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177
"[no3500957594178george.bl...@reqres.in|mailto:no3500957594178george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178
 
 
I had posted this question in user community and Dian FU suggested i could 
create a Jira as this is not supported right now
[https://lists.apache.org/thread/y2g7kjf6ylmqtm2w9b28kfcdborvcgtn]
 
 
sample code to create a csv in s3
 
def create_source_table(table_name, input_path):
return""" CREATE TABLE \{0} (
transaction_id VARCHAR(100),
card_hash VARCHAR(100)

) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '\{1}'
) """.format(
table_name, input_path)

def create_sink_table(table_name, bucket_name):
return""" CREATE TABLE \{0} (
transaction_id VARCHAR(100),
card_hash VARCHAR(100),
brand_id VARCHAR(100)
)
with (
'connector'='filesystem',
'path'='\{1}',
'format'='csv'
) """.format(
table_name, bucket_name)
 
2. Creates a source table from a Kinesis Data Stream
table_env.execute_sql(
create_source_table(
input_table_name, input_file
)
){color:#00}
{color}
table_env.execute_sql(
create_sink_table(
out_table_name, output_bucket_name
)
){color:#00}
{color}
table_env.register_function("addme1", addme1)
{color:#00} {color}{color:#00}
{color}{color:#00}
{color}
source_table = table_env.from_path(input_table_name)
source_table.select(addme1(source_table.transaction_id),source_table.card_hash, 
source_table.transaction_id.alias('brand_id')).execute_insert(out_table_name).wait()
{color:#202020} {color}
 
 
apache-flink version - 1.13
python 3.8
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27188) Deprecate StreamingFileSink

2022-04-11 Thread Daisy Tsang (Jira)
Daisy Tsang created FLINK-27188:
---

 Summary: Deprecate StreamingFileSink
 Key: FLINK-27188
 URL: https://issues.apache.org/jira/browse/FLINK-27188
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.12.7
Reporter: Daisy Tsang
Assignee: Daisy Tsang
 Fix For: 1.12.7


The StreamingFileSink has been deprecated in favor of the unified FileSink 
since Flink 1.12.

 

This changed is reflected in the docs, but not yet in the codebase.

 

https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

https://issues.apache.org/jira/browse/FLINK-19510

https://issues.apache.org/jira/browse/FLINK-20337

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] blake-wilson commented on pull request #19417: [FLINK-20060][Connectors / Kinesis] Add a Collector to KinsesisDeserializationSchema

2022-04-11 Thread GitBox


blake-wilson commented on PR #19417:
URL: https://github.com/apache/flink/pull/19417#issuecomment-1095295771

   Test failure is from a Kafka integration test and appears to be unrelated
   ```
   2022-04-11T07:53:52.3560140Z Apr 11 07:53:52 Caused by: 
java.lang.IllegalStateException: Cannot mark for checkpoint 42, already marked 
for checkpoint 41
   2022-04-11T07:53:52.3560857Z Apr 11 07:53:52 at 
org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is

2022-04-11 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-27187:
---

 Summary: The attemptsPerUpload metric may be lower than it 
actually is
 Key: FLINK-27187
 URL: https://issues.apache.org/jira/browse/FLINK-27187
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Feifan Wang


The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate 
distributions of number of attempts per upload.

In the current implementation, each successful attempt try to update 
attemptsPerUpload with its attemptNumber.

But consider this case: 
 # attempt 1 timeout, then schedule attempt 2
 # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1

In fact there are two attempts, but attemptsPerUpload updated with 1.

So, I think we should add "actionAttemptsCount" to 
RetryExecutor.RetriableActionAttempt, this field shared across all attempts to 
execute the same upload action representing the number of upload attempts. And 
completed attempt should use this field update attemptsPerUpload.

 

How do you think about ? [~ym] , [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #164: [FLINK-26871] Handle session job spec change

2022-04-11 Thread GitBox


Aitozi commented on PR #164:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/164#issuecomment-1095272470

   @gyfora I introduced the `SavepointObserver` and `JobStatusObserver` to 
solve the code duplication. Please take a look again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

2022-04-11 Thread GitBox


gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847507611


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials 
credentials) {
  * task managers.
  */
 @Override
-public void start() {
-LOG.info("Starting renewal task");
+public void start() throws Exception {
+checkState(renewalExecutor == null, "Manager is already started");
+
+if (!isRenewalPossible()) {
+LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+return;
+}
+
+ThreadFactory threadFactory =
+new ThreadFactoryBuilder()
+.setDaemon(true)
+.setNameFormat("Credential Renewal Thread")
+.build();
+renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   In the latest code `ComponentMainThreadExecutor` is null. When I know from 
where it should come from it will be filled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

2022-04-11 Thread GitBox


gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847504606


##
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;

Review Comment:
   I basically not agree that mocking is bad in general but this case is 
special.
   Junit5 is not supporting powermock so no other possibility than function 
override.
   Yeah, there is a possibility to add an interface but I felt it would be an 
overkill.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] whitecloud6688 commented on pull request #19416: FlinkSQL中创建的表结构信息在SparkSQL中不可见

2022-04-11 Thread GitBox


whitecloud6688 commented on PR #19416:
URL: https://github.com/apache/flink/pull/19416#issuecomment-1095242154

   
感谢回复,可能是我理解错了,FlinkSQL中创建的表,在SparkSQL中是不可读取数据的,如果要读,需要创建外部表关联到FlinkSQL表的路径。所以用desc
 Flink表是看不到字段的,但是能看到 Table Properties 中的 flink.schema 描述。 @luoyuxia 
@MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27155) Reduce multiple reads to the same Changelog file in the same taskmanager during restore

2022-04-11 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520657#comment-17520657
 ] 

Feifan Wang commented on FLINK-27155:
-

Thanks for your reply [~roman] , I’m glad to implement this proposal, can you 
assign this ticket to me ?  I'll discuss the specifics implementation with you 
before coding If I was assigned this ticket.

> Reduce multiple reads to the same Changelog file in the same taskmanager 
> during restore
> ---
>
> Key: FLINK-27155
> URL: https://issues.apache.org/jira/browse/FLINK-27155
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Major
>
> h3. Background
> In the current implementation, State changes of different operators in the 
> same taskmanager may be written to the same changelog file, which effectively 
> reduces the number of files and requests to DFS.
> But on the other hand, the current implementation also reads the same 
> changelog file multiple times on recovery. More specifically, the number of 
> times the same changelog file is accessed is related to the number of 
> ChangeSets contained in it. And since each read needs to skip the preceding 
> bytes, this network traffic is also wasted.
> The result is a lot of unnecessary request to DFS when there are multiple 
> slots and keyed state in the same taskmanager.
> h3. Proposal
> We can reduce multiple reads to the same changelog file in the same 
> taskmanager during restore.
> One possible approach is to read the changelog file all at once and cache it 
> in memory or local file for a period of time when reading the changelog file.
> I think this could be a subtask of [v2 FLIP-158: Generalized incremental 
> checkpoints|https://issues.apache.org/jira/browse/FLINK-25842] .
> Hi [~ym] , [~roman]  how do you think about ?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24700) Clarify semantics of filter, projection, partition, and metadata pushdown

2022-04-11 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520654#comment-17520654
 ] 

Francesco Guardiani commented on FLINK-24700:
-

Has this one already been solved [~twalthr] by one of the previous interfaces 
reworking efforts?

> Clarify semantics of filter, projection, partition, and metadata pushdown
> -
>
> Key: FLINK-24700
> URL: https://issues.apache.org/jira/browse/FLINK-24700
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> FLINK-19903 has revealed a couple of shortcomings that occur when 
> implementing multiple ability interfaces. We should improve the documentation 
> and better define the semantics.
> - Push produced type not only for metadata pushdown but also projection push 
> down.
> - Clarify order of filter + projection
> - Clarify order of projection/filter + partition
> - Simplify handling of partition columns
> ...



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-26905) Re-add FlinkDeploymentList and FlinkSessionJobList classes

2022-04-11 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-26905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-26905.
--
Resolution: Fixed

Implemented via 
[bd835f8|https://github.com/apache/flink-kubernetes-operator/commit/bd835f8f70b73004a73321225e5298408ae3e0e9]

> Re-add FlinkDeploymentList and FlinkSessionJobList classes
> --
>
> Key: FLINK-26905
> URL: https://issues.apache.org/jira/browse/FLINK-26905
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: Gyula Fora
>Assignee: Jaganathan Asokan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> Seems like removing these was a bad idea as it breaks the fabric8 java client 
> when using the POJO classes



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-kubernetes-operator] asfgit closed pull request #129: [FLINK-26905] Re-add FlinkDeploymentList and FlinkSessionJobList classes

2022-04-11 Thread GitBox


asfgit closed pull request #129: [FLINK-26905] Re-add FlinkDeploymentList and 
FlinkSessionJobList classes
URL: https://github.com/apache/flink-kubernetes-operator/pull/129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

2022-04-11 Thread GitBox


infoverload commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847454084


##
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
 private static void executeOriginalVariant(StreamExecutionEnvironment env, 
ParameterTool pt)
 throws Exception {
+Duration maxOutOfOrderness = extractTimestamp(pt);
 KeyedStream source =
 env.addSource(createEventSource(pt))
 .name("EventSource")
 .uid("EventSource")
-
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+.assignTimestampsAndWatermarks(
+
WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   I tried various things including 
   
   
`WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).createTimestampAssigner(createTimestampExtractor(pt)))`
   
   and 
   
   
`WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withTimestampAssigner(createTimestampExtractor(pt)))`
   
   but the types never seem to match.
   
   Not sure how to reconcile the fact that it is looking for a 
`TimestampAssignerSupplier` but getting a 
`BoundedOutOfOrdernessTimestampExtractor`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19426: [FLINK-25928] Throw validation error early for CAST

2022-04-11 Thread GitBox


slinkydeveloper commented on code in PR #19426:
URL: https://github.com/apache/flink/pull/19426#discussion_r847453122


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java:
##
@@ -589,6 +591,37 @@ protected Boolean defaultMethod(LogicalType targetType) {
 }
 }
 
+/**
+ * Check if the source/target pair is not allowed and throw a {@link 
ValidationException} with a
+ * useful error message.
+ */

Review Comment:
   This javadoc is not true, as if the source target pair is not allowed in all 
other cases except numeric to timestamp, this function won't fail.
   
   In general I don't like much the fact that we have both `validate` and 
`supportsExplicitCast`, is there a better way to solve this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19349: [FLINK-27043][table] Removing old csv format references

2022-04-11 Thread GitBox


slinkydeveloper commented on code in PR #19349:
URL: https://github.com/apache/flink/pull/19349#discussion_r847448720


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/WithTableEnvironment.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Using this annotation you can inject in the test method:
+ *
+ * 
+ *   {@link TableEnvironment}
+ *   {@link StreamExecutionEnvironment} (Java or Scala)
+ *   {@link StreamTableEnvironment} (Java or Scala)
+ * 
+ *
+ * The underlying parameter injector will infer automatically the type to 
use from the signature

Review Comment:
   Not sure I can add it in scala, as the syntax highlight won't work... 
Anyway, I included it in Java, is it ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] zentol commented on pull request #526: Announcement blogpost for the 1.15 release

2022-04-11 Thread GitBox


zentol commented on PR #526:
URL: https://github.com/apache/flink-web/pull/526#issuecomment-1095162237

   I would really like to see the experimental OpenAPI specification being part 
of the announcement. That's not something that existing users would stumble 
upon in the docs I'm afraid.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] twalthr commented on a diff in pull request #19349: [FLINK-27043][table] Removing old csv format references

2022-04-11 Thread GitBox


twalthr commented on code in PR #19349:
URL: https://github.com/apache/flink/pull/19349#discussion_r847357250


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/WithTableEnvironment.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Using this annotation you can inject in the test method:
+ *
+ * 
+ *   {@link TableEnvironment}
+ *   {@link StreamExecutionEnvironment} (Java or Scala)
+ *   {@link StreamTableEnvironment} (Java or Scala)
+ * 
+ *
+ * When using with {@link ParameterizedTest}, make sure the table 
environment parameter is the
+ * last one in the signature.
+ */
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER})
+@Retention(RetentionPolicy.RUNTIME)
+@ExtendWith(TableJUnitExtensions.TableEnvironmentParameterResolver.class)
+public @interface WithTableEnvironment {

Review Comment:
   Shall we move this to `table-test-utils`?



##
flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out:
##
@@ -42,4 +42,4 @@ Calc(select=[a, b, c], where=[(a > 10)])
   "side" : "second"
 } ]
   } ]
-}
\ No newline at end of file
+}

Review Comment:
   remove these file changes



##
flink-end-to-end-tests/test-scripts/test_batch_sql.sh:
##
@@ -72,11 +72,14 @@ set_config_key "taskmanager.numberOfTaskSlots" "1"
 start_cluster
 
 # The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots
-$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath 
"file://${OUTPUT_FILE_PATH}" -sqlStatement \
+$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath 
"${OUTPUT_FILE_PATH}" -sqlStatement \
 "INSERT INTO sinkTable $(sqlJobQuery)"
 
+# Concat result
+cat ${OUTPUT_FILE_PATH}/* > ${OUTPUT_FILE_PATH}/result.csv

Review Comment:
   did you check that the appending order is deterministic?



##
flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java:
##
@@ -62,13 +63,17 @@ public static void main(String[] args) throws Exception {
 .registerTableSourceInternal("table1", new 
GeneratorTableSource(10, 100, 60, 0));
 ((TableEnvironmentInternal) tEnv)
 .registerTableSourceInternal("table2", new 
GeneratorTableSource(5, 0.2f, 60, 5));
-((TableEnvironmentInternal) tEnv)
-.registerTableSinkInternal(
-"sinkTable",
-new CsvTableSink(outputPath)
-.configure(
-new String[] {"f0", "f1"},
-new TypeInformation[] {Types.INT, 
Types.SQL_TIMESTAMP}));
+tEnv.createTemporaryTable(
+"sinkTable",
+TableDescriptor.forConnector("filesystem")
+.schema(
+Schema.newBuilder()
+.column("f0", DataTypes.INT())
+.column("f1", DataTypes.TIMESTAMP(3))
+.build())
+.option(FileSystemConnectorOptions.PATH, outputPath)
+.format(FormatDescriptor.forFormat("csv").build())

Review Comment:
   use `format("csv")`



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala:
##
@@ -71,95 +73,82 @@ object TestTableSourceSinks {
  |  score DOUBLE,
  |  last STRING
  |) WITH (
- |  'connector.type' = 

[GitHub] [flink] flinkbot commented on pull request #19426: [FLINK-25928] Throw validation error early for CAST

2022-04-11 Thread GitBox


flinkbot commented on PR #19426:
URL: https://github.com/apache/flink/pull/19426#issuecomment-1095157121

   
   ## CI report:
   
   * 1dd50a02ec38485aa442a46ca439df7861565ed0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] matriv commented on pull request #19426: [FLINK-25928] Throw validation error early for CAST

2022-04-11 Thread GitBox


matriv commented on PR #19426:
URL: https://github.com/apache/flink/pull/19426#issuecomment-1095153446

   Added the validation only in the `supportsExplicitCasting` as there are 
cases where we do implicit casting between TIMESTAMP and numeric, i.e. In 
`TemporalTypesTest`: 
   ```
   testAllApis(
 'f2 + 10.days + 4.millis,
 "f2 + INTERVAL '10 00:00:00.004' DAY TO SECOND",
 "1990-10-24 10:20:45.127")
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-25928) Refactor timestamp<->number validation messages

2022-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-25928:
---
Labels: pull-request-available  (was: )

> Refactor timestamp<->number validation messages
> ---
>
> Key: FLINK-25928
> URL: https://issues.apache.org/jira/browse/FLINK-25928
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Marios Trivyzas
>Assignee: Marios Trivyzas
>Priority: Major
>  Labels: pull-request-available
>
> Move timestamp -> number and number -> timestamp validation messages which 
> suggest the usage of dedicated methods to a validation method in 
> `{*}LogicalTypeCasts{*}`, and call this validation early, before trying to 
> resolve the `{*}CastRule`{*}s.
> https://github.com/apache/flink/pull/18582#discussion_r797372485
> https://github.com/apache/flink/commit/29d8d03b1be818a64834e5ba670a83d8857111ab



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] matriv opened a new pull request, #19426: [FLINK-25928] Throw validation error early for CAST

2022-04-11 Thread GitBox


matriv opened a new pull request, #19426:
URL: https://github.com/apache/flink/pull/19426

   ## What is the purpose of the change
   
   Throw early a validation error with workaround suggestion for
   casting between numeric types and `TIMESTAMP`/`TIMESTAMP_LTZ` before
   reaching the `CastRuleProvider`. This way we don't need to implement
   such errors in Cast Rules and we can throw the error message early
   before reaching the code generator.
   
   
   ## Brief change log
   
 - Implement validation for from/to `CAST` pairs in `LogicalTypeCasts`
 - Remove the Numeric <-> Timestamp/Timestamp_ltz rules which were added 
only to throw this message
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`CastFunctionITCase`, `TemporalTypesTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

2022-04-11 Thread GitBox


gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847414464


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials 
credentials) {
  * task managers.
  */
 @Override
-public void start() {
-LOG.info("Starting renewal task");
+public void start() throws Exception {
+checkState(renewalExecutor == null, "Manager is already started");
+
+if (!isRenewalPossible()) {
+LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+return;
+}
+
+ThreadFactory threadFactory =

Review Comment:
   Since we plan to use existing executors this code will be deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

2022-04-11 Thread GitBox


gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847413385


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials 
credentials) {
  * task managers.
  */
 @Override
-public void start() {
-LOG.info("Starting renewal task");
+public void start() throws Exception {
+checkState(renewalExecutor == null, "Manager is already started");
+
+if (!isRenewalPossible()) {
+LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+return;
+}
+
+ThreadFactory threadFactory =
+new ThreadFactoryBuilder()
+.setDaemon(true)
+.setNameFormat("Credential Renewal Thread")
+.build();
+renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   I'm fine w/ using the already existing executors but I don't see any 
`ComponentMainThreadExecutor` in `ResourceManager`. Can you point where can I 
find it to pass to DTM?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on pull request #19403: [FLINK-27120][docs-zh] Translate "Gradle" tab of development > conf > overview

2022-04-11 Thread GitBox


zoltar9264 commented on PR #19403:
URL: https://github.com/apache/flink/pull/19403#issuecomment-1095130896

   Thanks @RocMarshal , Is there any thing need I to do ? Such as squash 
commits ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847395187


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -306,77 +329,75 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
-## Enabling Changelog
+
 
-{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+## 开启 Changelog
 
-{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}}
 
-### Introduction
+{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}}
 
-Changelog is a feature that aims to decrease checkpointing time and, 
therefore, end-to-end latency in exactly-once mode.
+
 
-Most commonly, checkpoint duration is affected by:
+### 介绍
 
-1. Barrier travel time and alignment, addressed by
-   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
-   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
-2. Snapshot creation time (so-called synchronous phase), addressed by 
asynchronous snapshots (mentioned [above]({{<
-   ref "#the-embeddedrocksdbstatebackend">}}))
-4. Snapshot upload time (asynchronous phase)
+Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以减少 exactly-once 模式下的端到端延迟。
 
-Upload time can be decreased by [incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}).
-However, most incremental state backends perform some form of compaction 
periodically, which results in re-uploading the
-old state in addition to the new changes. In large deployments, the 
probability of at least one task uploading lots of
-data tends to be very high in every checkpoint.
+一般情况下 checkpoint 持续时间受如下因素影响:
 
-With Changelog enabled, Flink uploads state changes continuously and forms a 
changelog. On checkpoint, only the relevant
-part of this changelog needs to be uploaded. The configured state backend is 
snapshotted in the
-background periodically. Upon successful upload, the changelog is truncated.
+1. Barrier 到达和对齐时间,可以通过 [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) 
and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) 解决。
 
-As a result, asynchronous phase duration is reduced, as well as synchronous 
phase - because no data needs to be flushed
-to disk. In particular, long-tail latency is improved.
+2. 快照制作时间(所谓同步阶段), 可以通过异步快照解决(如[上文]({{<
+   ref "#the-embeddedrocksdbstatebackend">}})所述)。
 
-However, resource usage is higher:
+3. 快照上传时间(异步阶段)。
 
-- more files are created on DFS
-- more files can be left undeleted DFS (this will be addressed in the future 
versions in FLINK-25511 and FLINK-25512)
-- more IO bandwidth is used to upload state changes
-- more CPU used to serialize state changes
-- more memory used by Task Managers to buffer state changes
+可以用过[增量 checkpoints]({{< ref "#incremental-checkpoints" >}}) 
来减少上传时间。但是,大多数增量的状态后端会定期执行某种形式的合并,这会导致除了新的变更之外还要重新上传旧状态。在大规模部署中,每个 checkpoint 
中至少有一个 task 上传大量数据的可能性往往非常高。

Review Comment:
   Yes, I will fix it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847393806


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -306,77 +329,75 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
-## Enabling Changelog
+
 
-{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+## 开启 Changelog
 
-{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}}
 
-### Introduction
+{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}}

Review Comment:
   Hi @masteryhx , I also used "您" at first, until I saw  [Flink Translation 
Specifications](https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications)
 .  Do you think I should change this to "您" or  everywhere else to "你" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847386103


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -38,6 +38,8 @@ under the License.
 在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。
 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。
 
+
+
 # 可用的 State Backends

Review Comment:
   Hi @masteryhx , available-state-backends is in English version, you can 
check it with 
   
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#available-state-backends



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

2022-04-11 Thread GitBox


gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847376435


##
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##
@@ -79,4 +93,48 @@ public void testAllProvidersLoaded() {
 assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
 assertFalse(delegationTokenManager.isProviderLoaded("throw"));
 }
+
+@Test
+public void isRenewalPossibleMustGiveBackFalseByDefault() throws 
IOException {
+UserGroupInformation ugi = 
PowerMockito.mock(UserGroupInformation.class);
+PowerMockito.mockStatic(UserGroupInformation.class);
+when(UserGroupInformation.getCurrentUser()).thenReturn(ugi);
+
+ExceptionThrowingDelegationTokenProvider.enabled = false;
+Configuration configuration = new Configuration();
+KerberosDelegationTokenManager delegationTokenManager =
+new KerberosDelegationTokenManager(configuration);
+
+assertFalse(delegationTokenManager.isRenewalPossible());
+}
+
+@Test
+public void isRenewalPossibleMustGiveBackTrueWhenKeytab() throws 
IOException {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese

2022-04-11 Thread GitBox


zoltar9264 commented on code in PR #19252:
URL: https://github.com/apache/flink/pull/19252#discussion_r847374465


##
docs/content.zh/docs/ops/state/state_backends.md:
##
@@ -306,77 +329,75 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
-## Enabling Changelog
+
 
-{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+## 开启 Changelog
 
-{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}}
 
-### Introduction
+{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}}
 
-Changelog is a feature that aims to decrease checkpointing time and, 
therefore, end-to-end latency in exactly-once mode.
+
 
-Most commonly, checkpoint duration is affected by:
+### 介绍
 
-1. Barrier travel time and alignment, addressed by
-   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
-   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
-2. Snapshot creation time (so-called synchronous phase), addressed by 
asynchronous snapshots (mentioned [above]({{<
-   ref "#the-embeddedrocksdbstatebackend">}}))
-4. Snapshot upload time (asynchronous phase)
+Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以减少 exactly-once 模式下的端到端延迟。
 
-Upload time can be decreased by [incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}).
-However, most incremental state backends perform some form of compaction 
periodically, which results in re-uploading the
-old state in addition to the new changes. In large deployments, the 
probability of at least one task uploading lots of
-data tends to be very high in every checkpoint.
+一般情况下 checkpoint 持续时间受如下因素影响:
 
-With Changelog enabled, Flink uploads state changes continuously and forms a 
changelog. On checkpoint, only the relevant
-part of this changelog needs to be uploaded. The configured state backend is 
snapshotted in the
-background periodically. Upon successful upload, the changelog is truncated.
+1. Barrier 到达和对齐时间,可以通过 [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) 
and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) 解决。

Review Comment:
   Sorry , it's my oversight, I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

2022-04-11 Thread GitBox


slinkydeveloper commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847359774


##
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
 private static void executeOriginalVariant(StreamExecutionEnvironment env, 
ParameterTool pt)
 throws Exception {
+Duration maxOutOfOrderness = extractTimestamp(pt);
 KeyedStream source =
 env.addSource(createEventSource(pt))
 .name("EventSource")
 .uid("EventSource")
-
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+.assignTimestampsAndWatermarks(
+
WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   I mean the timestamp extractor from the record



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

2022-04-11 Thread GitBox


infoverload commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847355828


##
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
 private static void executeOriginalVariant(StreamExecutionEnvironment env, 
ParameterTool pt)
 throws Exception {
+Duration maxOutOfOrderness = extractTimestamp(pt);
 KeyedStream source =
 env.addSource(createEventSource(pt))
 .name("EventSource")
 .uid("EventSource")
-
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+.assignTimestampsAndWatermarks(
+
WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   Are you sure? Because the `Duration maxOutOfOrderness = 
extractTimestamp(pt);` above already has the timestamp 



##
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
 private static void executeOriginalVariant(StreamExecutionEnvironment env, 
ParameterTool pt)
 throws Exception {
+Duration maxOutOfOrderness = extractTimestamp(pt);
 KeyedStream source =
 env.addSource(createEventSource(pt))
 .name("EventSource")
 .uid("EventSource")
-
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+.assignTimestampsAndWatermarks(
+
WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   Are you sure? Because the `Duration maxOutOfOrderness = 
extractTimestamp(pt)` above already has the timestamp 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

2022-04-11 Thread GitBox


infoverload commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847354659


##
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java:
##
@@ -65,7 +67,11 @@ public static void main(String[] args) throws Exception {
 Files.size(inputFile),
 inputDir.toAbsolutePath().toString(),
 containedFile.getFileName().toString()))
-.writeAsText(params.getRequired("output"), 
FileSystem.WriteMode.OVERWRITE);
+.sinkTo(
+FileSink.forRowFormat(
+new 
org.apache.flink.core.fs.Path(outputPath),

Review Comment:
   Turns out I am using both! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

2022-04-11 Thread GitBox


gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1095077528

   > Should the obtainDelegationTokens(Credentials credentials) be implemented? 
Maybe it could be a shared code path for the renewal (the one time renewal that 
is scheduled)?
   
   Not sure what you mean here. TGT renewal and token obtain are totally 
different from many factors:
   * They obtain different things
   * They does it with different frequency
   
   I would like ask you to elaborate on this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   >