[jira] [Updated] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object
[ https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Akshay Hazari updated FLINK-27193: -- Description: We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails The KyroSerializer and Deserializer is used like this {code:java} import kotlin.reflect.KClass import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.core.memory.DataInputDeserializer import org.apache.flink.core.memory.DataOutputSerializer class KryoSerializerExtension { fun serde(t: T): T { val bytes = serialize(t) return deserialize(bytes, t::class) } fun serialize(any: Any): ByteArray { val config = ExecutionConfig() config.registerKryoType(any.javaClass) val serializer = KryoSerializer(any.javaClass, config) val output = DataOutputSerializer(1) serializer.serialize(any, output) return output.sharedBuffer } fun deserialize(bytes: ByteArray, kClass: KClass): T { val config = ExecutionConfig() config.registerKryoType(kClass.java) val serializer = KryoSerializer(kClass.java, config) val input = DataInputDeserializer(bytes) return serializer.deserialize(input) } } {code} The Unit test simply looks like this {code:java} @Test fun fieldRecord() { val record = getFieldRecord() val result = kryo.serde(record) assertThat(result).isEqualTo(record) }{code} This is the actual vs expected assertion error. The record is huge all the components hash result in a different value. I am not sure hot the record is modified. {code:java} org.opentest4j.AssertionFailedError: expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK : 10 Merge Flag : false Error Const : 0.5887050112577373 RSE : 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons : 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP : 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty : false Direct, Capacity bytes : false, Estimation Mode : false K : 128 N : 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern : 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes : 48 Updatable Storage Bytes : 64 Normalized Rank Error : 1.406% Normalized Rank Error (PMF) : 1.711% Min Value : 1.00e+00 Max Value : 3.00e+00 ### END SKETCH SUMMARY , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: n: 1 min: 3.0 max: 3.0 sum: 3.0 mean: 3.0 geometric mean: 3.0004 variance: 0.0 population variance: 0.0 second moment: 0.0 sum of squares: 9.0 standard deviation: 0.0 sum of logs: 1.0986122886681098 ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)" but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK : 10 Merge Flag : false Error Const : 0.5887050112577373 RSE : 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons : 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP : 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty : false Direct, Capacity bytes : false, Estimation Mode : false K : 128 N : 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern : 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes : 48 Updatable Storage Bytes : 64 Normalized Rank Error : 1.406% Normalized Rank Error (PMF) : 1.711% Min Value : 1.00e+00 Max Value : 3.00e+00 ### END SKETCH SUMMARY ,
[jira] [Updated] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object
[ https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Akshay Hazari updated FLINK-27193: -- Priority: Minor (was: Not a Priority) > Kyro Serialisation and Deserialisation returns a different object > -- > > Key: FLINK-27193 > URL: https://issues.apache.org/jira/browse/FLINK-27193 > Project: Flink > Issue Type: Bug >Reporter: Akshay Hazari >Priority: Minor > > We have a unit test to check if Kyro serialisation and deserialisation > results in the same value but it fails > The KyroSerializer and Deserializer is used like this > {code:java} > import kotlin.reflect.KClass > import org.apache.flink.api.common.ExecutionConfig > import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > import org.apache.flink.core.memory.DataInputDeserializer > import org.apache.flink.core.memory.DataOutputSerializer > class KryoSerializerExtension { > fun serde(t: T): T > { val bytes = serialize(t) return deserialize(bytes, t::class) } > fun serialize(any: Any): ByteArray { > val config = ExecutionConfig() > config.registerKryoType(any.javaClass) > val serializer = KryoSerializer(any.javaClass, config) > val output = DataOutputSerializer(1) > serializer.serialize(any, output) > return output.sharedBuffer > } > fun deserialize(bytes: ByteArray, kClass: KClass): T { > val config = ExecutionConfig() > config.registerKryoType(kClass.java) > val serializer = KryoSerializer(kClass.java, config) > val input = DataInputDeserializer(bytes) > return serializer.deserialize(input) > } > } > {code} > > The Unit test simply looks like this > {code:java} > @Test > fun fieldRecord() { > val record = getFieldRecord() > val result = kryo.serde(record) > assertThat(result).isEqualTo(record) > }{code} > This is the actual vs expected assertion error. > The record is huge all the components hash result in a different value. I am > not sure what kyro does which is modifying the record. > {code:java} > org.opentest4j.AssertionFailedError: > expected: "FieldRecord(name=foo, type=string, > fieldCounter=FieldCounter(populated=1, missing=3), > countDistinctCalculator=### CPD SKETCH - PREAMBLE: > Flavor : SPARSE > LgK : 10 > Merge Flag : false > Error Const : 0.5887050112577373 > RSE : 0.01839703160180429 > Seed Hash : 93cc | 37836 > Num Coupons : 2 > Num Pairs (SV) : 2 > First Inter Col: 0 > Valid Window : false > Valid PairTable: true > Window Offset : 0 > KxP : 1023.375 > HIP Accum : 2.00012208521548 > ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, > frequencies: {test=1}, numericalRangeHistogramCalculator= > ### Quantiles HeapUpdateDoublesSketch SUMMARY: > Empty : false > Direct, Capacity bytes : false, > Estimation Mode : false > K : 128 > N : 2 > Levels (Needed, Total, Valid): 0, 0, 0 > Level Bit Pattern : 0 > BaseBufferCount : 2 > Combined Buffer Capacity : 4 > Retained Items : 2 > Compact Storage Bytes : 48 > Updatable Storage Bytes : 64 > Normalized Rank Error : 1.406% > Normalized Rank Error (PMF) : 1.711% > Min Value : 1.00e+00 > Max Value : 3.00e+00 > ### END SKETCH SUMMARY > , > numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: > n: 1 > min: 3.0 > max: 3.0 > sum: 3.0 > mean: 3.0 > geometric mean: 3.0004 > variance: 0.0 > population variance: 0.0 > second moment: 0.0 > sum of squares: 9.0 > standard deviation: 0.0 > sum of logs: 1.0986122886681098 > ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, > mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)" > but was : "FieldRecord(name=foo, type=string, > fieldCounter=FieldCounter(populated=1, missing=3), > countDistinctCalculator=### CPD SKETCH - PREAMBLE: > Flavor : SPARSE > LgK : 10 > Merge Flag : false > Error Const : 0.5887050112577373 > RSE : 0.01839703160180429 > Seed Hash : 93cc | 37836 > Num Coupons : 2 > Num Pairs (SV) : 2 > First Inter Col: 0 > Valid Window : false > Valid PairTable: true > Window Offset : 0 > KxP : 1023.375 > HIP Accum : 2.00012208521548 > ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, > frequencies: {test=1}, numericalRangeHistogramCalculator= > ### Quantiles HeapUpdateDoublesSketch SUMMARY: > Empty : false > Direct,
[jira] [Updated] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object
[ https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Akshay Hazari updated FLINK-27193: -- Description: We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails The KyroSerializer and Deserializer is used like this {code:java} import kotlin.reflect.KClass import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.core.memory.DataInputDeserializer import org.apache.flink.core.memory.DataOutputSerializer class KryoSerializerExtension { fun serde(t: T): T { val bytes = serialize(t) return deserialize(bytes, t::class) } fun serialize(any: Any): ByteArray { val config = ExecutionConfig() config.registerKryoType(any.javaClass) val serializer = KryoSerializer(any.javaClass, config) val output = DataOutputSerializer(1) serializer.serialize(any, output) return output.sharedBuffer } fun deserialize(bytes: ByteArray, kClass: KClass): T { val config = ExecutionConfig() config.registerKryoType(kClass.java) val serializer = KryoSerializer(kClass.java, config) val input = DataInputDeserializer(bytes) return serializer.deserialize(input) } } {code} The Unit test simply looks like this {code:java} @Test fun fieldRecord() { val record = getFieldRecord() val result = kryo.serde(record) assertThat(result).isEqualTo(record) }{code} This is the actual vs expected assertion error. The record is huge all the components hash result in a different value. I am not sure what kyro does which is modifying the record. {code:java} org.opentest4j.AssertionFailedError: expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK : 10 Merge Flag : false Error Const : 0.5887050112577373 RSE : 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons : 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP : 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty : false Direct, Capacity bytes : false, Estimation Mode : false K : 128 N : 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern : 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes : 48 Updatable Storage Bytes : 64 Normalized Rank Error : 1.406% Normalized Rank Error (PMF) : 1.711% Min Value : 1.00e+00 Max Value : 3.00e+00 ### END SKETCH SUMMARY , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: n: 1 min: 3.0 max: 3.0 sum: 3.0 mean: 3.0 geometric mean: 3.0004 variance: 0.0 population variance: 0.0 second moment: 0.0 sum of squares: 9.0 standard deviation: 0.0 sum of logs: 1.0986122886681098 ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)" but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK : 10 Merge Flag : false Error Const : 0.5887050112577373 RSE : 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons : 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP : 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty : false Direct, Capacity bytes : false, Estimation Mode : false K : 128 N : 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern : 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes : 48 Updatable Storage Bytes : 64 Normalized Rank Error : 1.406% Normalized Rank Error (PMF) : 1.711% Min Value : 1.00e+00 Max Value : 3.00e+00 ### END SKETCH SUMMARY ,
[jira] [Created] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object
Akshay Hazari created FLINK-27193: - Summary: Kyro Serialisation and Deserialisation returns a different object Key: FLINK-27193 URL: https://issues.apache.org/jira/browse/FLINK-27193 Project: Flink Issue Type: Bug Reporter: Akshay Hazari We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails The KyroSerializer and Deserializer is used like this import kotlin.reflect.KClass import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.core.memory.DataInputDeserializer import org.apache.flink.core.memory.DataOutputSerializer class KryoSerializerExtension \{ fun serde(t: T): T { val bytes = serialize(t) return deserialize(bytes, t::class) } fun serialize(any: Any): ByteArray \{ val config = ExecutionConfig() config.registerKryoType(any.javaClass) val serializer = KryoSerializer(any.javaClass, config) val output = DataOutputSerializer(1) serializer.serialize(any, output) return output.sharedBuffer } fun deserialize(bytes: ByteArray, kClass: KClass): T \{ val config = ExecutionConfig() config.registerKryoType(kClass.java) val serializer = KryoSerializer(kClass.java, config) val input = DataInputDeserializer(bytes) return serializer.deserialize(input) } } It simply looks like this @Test fun fieldRecord() \{ val record = getFieldRecord() val result = kryo.serde(record) assertThat(result).isEqualTo(record) } This is the actual vs expected assertion error. The record is huge all the components hash result in a different value. I am not sure what kyro does which is modifying the record. org.opentest4j.AssertionFailedError: expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK: 10 Merge Flag : false Error Const: 0.5887050112577373 RSE: 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons: 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP: 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: \{test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty: false Direct, Capacity bytes : false, Estimation Mode : false K: 128 N: 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern: 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes: 48 Updatable Storage Bytes : 64 Normalized Rank Error: 1.406% Normalized Rank Error (PMF) : 1.711% Min Value: 1.00e+00 Max Value: 3.00e+00 ### END SKETCH SUMMARY , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: n: 1 min: 3.0 max: 3.0 sum: 3.0 mean: 3.0 geometric mean: 3.0004 variance: 0.0 population variance: 0.0 second moment: 0.0 sum of squares: 9.0 standard deviation: 0.0 sum of logs: 1.0986122886681098 ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)" but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK: 10 Merge Flag : false Error Const: 0.5887050112577373 RSE: 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons: 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP: 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: \{test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty: false Direct, Capacity bytes : false, Estimation Mode : false K: 128 N: 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern: 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes: 48 Updatable Storage Bytes : 64 Normalized Rank Error: 1.406% Normalized Rank Error (PMF) : 1.711% Min Value: 1.00e+00 Max
[jira] [Commented] (FLINK-27190) Revisit error handling in main reconcile() loop
[ https://issues.apache.org/jira/browse/FLINK-27190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520915#comment-17520915 ] Gyula Fora commented on FLINK-27190: What improvements do you suggest? > Revisit error handling in main reconcile() loop > --- > > Key: FLINK-27190 > URL: https://issues.apache.org/jira/browse/FLINK-27190 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Matyas Orhidi >Priority: Major > Fix For: kubernetes-operator-1.0.0 > > > The are some improvements introduced around error handling: > * [https://github.com/java-operator-sdk/java-operator-sdk/pull/1033] > in the upcoming java-operator-sdk release > [v3.0.0.RC1.|https://github.com/java-operator-sdk/java-operator-sdk/releases/tag] > We should revisit and simplify further the error logic in > {{FlinkDeploymentController.reconcile()}} > {{Currently}} > * checked exceptions are wrapped in runtime exceptions > * validation errors are terminal errors but handled with differently > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] liuzhuang2017 commented on a diff in pull request #19413: [FLINK-16078] [docs-zh] Translate "Tuning Checkpoints and Large State…
liuzhuang2017 commented on code in PR #19413: URL: https://github.com/apache/flink/pull/19413#discussion_r847981768 ## docs/content.zh/docs/ops/state/large_state_tuning.md: ## @@ -166,149 +125,101 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { } ``` -## Capacity Planning - -This section discusses how to decide how many resources should be used for a Flink job to run reliably. -The basic rules of thumb for capacity planning are: - - - Normal operation should have enough capacity to not operate under constant *back pressure*. -See [back pressure monitoring]({{< ref "docs/ops/monitoring/back_pressure" >}}) for details on how to check whether the application runs under back pressure. - - - Provision some extra resources on top of the resources needed to run the program back-pressure-free during failure-free time. -These resources are needed to "catch up" with the input data that accumulated during the time the application -was recovering. -How much that should be depends on how long recovery operations usually take (which depends on the size of the state -that needs to be loaded into the new TaskManagers on a failover) and how fast the scenario requires failures to recover. - -*Important*: The base line should to be established with checkpointing activated, because checkpointing ties up -some amount of resources (such as network bandwidth). - - - Temporary back pressure is usually okay, and an essential part of execution flow control during load spikes, -during catch-up phases, or when external systems (that are written to in a sink) exhibit temporary slowdown. +## 容量规划 +本节讨论如何确定 Flink 作业应该使用多少资源才能可靠地运行。 +容量规划的基本经验法则是: + - 正常运行应该有足够的能力在恒定的*反压*下运行。 Review Comment: 这里我提了一个 hotfix pr,在 https://github.com/apache/flink/pull/19429,帮忙看看,感谢。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27165) Benchmark test SerializationFrameworkMiniBenchmarks#serializerHeavyString became unstable
[ https://issues.apache.org/jira/browse/FLINK-27165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520914#comment-17520914 ] Anton Kalashnikov commented on FLINK-27165: --- I can try to take a look. But I don't see that it relates to Java 11 since there were no changes at all for java 8(it was added only a new Jenkins build for java11). And also according to this graph java 11 was added a little later than instabilities appeared for the first time(you can compare java8 and java11 [here|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,6=serializerHeavyString=on=on=off=2=1000]) > Benchmark test SerializationFrameworkMiniBenchmarks#serializerHeavyString > became unstable > - > > Key: FLINK-27165 > URL: https://issues.apache.org/jira/browse/FLINK-27165 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.15.0, 1.16.0 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > The benchmark test > {{SerializationFrameworkMiniBenchmarks#serializerHeavyString}} became > unstable mid of January 2022 which cannot be explained (see > [graph|http://codespeed.dak8s.net:8000/timeline/#/?exe=1=serializerHeavyString=on=on=off=2=1000]). > > There is some suspicion that it was caused by FLINK-25246 because Java 11 was > added during that time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…
masteryhx commented on code in PR #19142: URL: https://github.com/apache/flink/pull/19142#discussion_r847968836 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java: ## @@ -391,7 +394,7 @@ static void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputSt dos.writeLong(handle.getStateSize()); dos.writeLong(handle.getCheckpointedSize()); writeStateHandleId(handle, dos); - +dos.writeUTF(handle.getStorageIdentifier()); Review Comment: I am thinking a more common case about switching the change log storage. There are some problems in current implementation which use same change log storage to read and write. I think we should use original change log storage to read and use new one to write. It also make sense for switching from enabled to disabled. But we cannot get original change log storage currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27192) Parquet File Read From Local
[ https://issues.apache.org/jira/browse/FLINK-27192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Karthik updated FLINK-27192: Issue Type: Technical Debt (was: Bug) > Parquet File Read From Local > > > Key: FLINK-27192 > URL: https://issues.apache.org/jira/browse/FLINK-27192 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / API, Table SQL / Client >Affects Versions: 1.13.6 > Environment: Flink-1.13.6 > Scala-2.11 > >Reporter: Karthik >Priority: Major > Fix For: 1.13.6 > > > Hi, I want to read a parquet file from my local > But it is asking for Hadoop Config > Note: I'm using this jar flink-sql-parquet_2.11-1.13.6.jar -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #87: [FLINK-27123] Introduce filter predicate for 'LIKE'
JingsongLi commented on code in PR #87: URL: https://github.com/apache/flink-table-store/pull/87#discussion_r847944688 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.predicate; + +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.store.file.stats.FieldStats; + +/** A {@link Predicate} to evaluate {@code filter like 'abc%' or filter like 'abc_'}. */ +public class StartsWith implements Predicate { Review Comment: `like 'abc_'` looks not like a `StartsWith`. We can have a separate class to support it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #87: [FLINK-27123] Introduce filter predicate for 'LIKE'
JingsongLi commented on code in PR #87: URL: https://github.com/apache/flink-table-store/pull/87#discussion_r847942602 ## flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java: ## @@ -325,11 +345,29 @@ public void testUnsupportedExpression() { field(0, DataTypes.INT()), literal(3)), call( -BuiltInFunctionDefinitions.LIKE, +BuiltInFunctionDefinitions.SIMILAR, field(1, DataTypes.INT()), literal(5))); assertThatThrownBy(() -> expression.accept(CONVERTER)) .isInstanceOf(PredicateConverter.UnsupportedExpression.class); + +CallExpression endPattern = Review Comment: There can be a `testUnsupportedLike` to test like only. The test cases can be: - equalsPattern/endPattern/middlePattern - equalsPattern/endPattern/middlePattern with escape - equalsPattern/endPattern/middlePattern with '_' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26761) Fix the cast exception thrown by PreValidateReWriter when insert into/overwrite a partitioned table.
[ https://issues.apache.org/jira/browse/FLINK-26761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-26761: --- Fix Version/s: 1.16.0 > Fix the cast exception thrown by PreValidateReWriter when insert > into/overwrite a partitioned table. > > > Key: FLINK-26761 > URL: https://issues.apache.org/jira/browse/FLINK-26761 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: zoucao >Priority: Major > Fix For: 1.16.0 > > > In `PreValidateReWriter#appendPartitionAndNullsProjects`, we should use > {code:java} > val names = sqlInsert.getTargetTableID.asInstanceOf[SqlIdentifier].names > {code} > to get the table name, instead of > {code:java} > val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names > {code} > when we execute the following sql: > {code:java} > insert into/overwrite table_name /*+ options(xxx) */ partition(xxx) select > {code} > invoke `sqlInsert.getTargetTable` will get a SqlTableRef, which can not be > cast to SqlIdentifier. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26761) Fix the cast exception thrown by PreValidateReWriter when insert into/overwrite a partitioned table.
[ https://issues.apache.org/jira/browse/FLINK-26761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520902#comment-17520902 ] godfrey he commented on FLINK-26761: [~zoucao] you are right, it's a bug. would you like to fix it ? > Fix the cast exception thrown by PreValidateReWriter when insert > into/overwrite a partitioned table. > > > Key: FLINK-26761 > URL: https://issues.apache.org/jira/browse/FLINK-26761 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: zoucao >Priority: Major > > In `PreValidateReWriter#appendPartitionAndNullsProjects`, we should use > {code:java} > val names = sqlInsert.getTargetTableID.asInstanceOf[SqlIdentifier].names > {code} > to get the table name, instead of > {code:java} > val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names > {code} > when we execute the following sql: > {code:java} > insert into/overwrite table_name /*+ options(xxx) */ partition(xxx) select > {code} > invoke `sqlInsert.getTargetTable` will get a SqlTableRef, which can not be > cast to SqlIdentifier. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-27192) Parquet File Read From Local
[ https://issues.apache.org/jira/browse/FLINK-27192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Karthik updated FLINK-27192: Component/s: Table SQL / API Table SQL / Client Fix Version/s: 1.13.6 Affects Version/s: 1.13.6 Description: Hi, I want to read a parquet file from my local But it is asking for Hadoop Config Note: I'm using this jar flink-sql-parquet_2.11-1.13.6.jar Environment: Flink-1.13.6 Scala-2.11 Summary: Parquet File Read From Local (was: Hi,) > Parquet File Read From Local > > > Key: FLINK-27192 > URL: https://issues.apache.org/jira/browse/FLINK-27192 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Client >Affects Versions: 1.13.6 > Environment: Flink-1.13.6 > Scala-2.11 > >Reporter: Karthik >Priority: Major > Fix For: 1.13.6 > > > Hi, I want to read a parquet file from my local > But it is asking for Hadoop Config > Note: I'm using this jar flink-sql-parquet_2.11-1.13.6.jar -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-27189) Support to always add quotes for string data in CSV format
[ https://issues.apache.org/jira/browse/FLINK-27189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-27189: Summary: Support to always add quotes for string data in CSV format (was: Ability for table api to always add quotes to generated csv) > Support to always add quotes for string data in CSV format > -- > > Key: FLINK-27189 > URL: https://issues.apache.org/jira/browse/FLINK-27189 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: shameet >Priority: Major > > I am using the table api in pyflink to generate a csv . What i noticed is > that its conditionally adding quotes around the data. What I want is quotes > around all the data > csv is being created in s3 > > > e.g in output below the data in last column was not quoted > "[transaction_idgeorge.bl...@reqres.in|mailto:transaction_idgeorge.bl...@reqres.in]",card_hash,transaction_id > "[no3500957594177george.bl...@reqres.in|mailto:no3500957594177george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177 > "[no3500957594178george.bl...@reqres.in|mailto:no3500957594178george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178 > > > I had posted this question in user community and Dian FU suggested i could > create a Jira as this is not supported right now > [https://lists.apache.org/thread/y2g7kjf6ylmqtm2w9b28kfcdborvcgtn] > > > sample code to create a csv in s3 > > def create_source_table(table_name, input_path): > return""" CREATE TABLE \{0} ( > transaction_id VARCHAR(100), > card_hash VARCHAR(100) > ) with ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = '\{1}' > ) """.format( > table_name, input_path) > def create_sink_table(table_name, bucket_name): > return""" CREATE TABLE \{0} ( > transaction_id VARCHAR(100), > card_hash VARCHAR(100), > brand_id VARCHAR(100) > ) > with ( > 'connector'='filesystem', > 'path'='\{1}', > 'format'='csv' > ) """.format( > table_name, bucket_name) > > 2. Creates a source table from a Kinesis Data Stream > table_env.execute_sql( > create_source_table( > input_table_name, input_file > ) > ){color:#00} > {color} > table_env.execute_sql( > create_sink_table( > out_table_name, output_bucket_name > ) > ){color:#00} > {color} > table_env.register_function("addme1", addme1) > {color:#00} {color}{color:#00} > {color}{color:#00} > {color} > source_table = table_env.from_path(input_table_name) > source_table.select(addme1(source_table.transaction_id),source_table.card_hash, > > source_table.transaction_id.alias('brand_id')).execute_insert(out_table_name).wait() > {color:#202020} {color} > > > apache-flink version - 1.13 > python 3.8 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-27189) Ability for table api to always add quotes to generated csv
[ https://issues.apache.org/jira/browse/FLINK-27189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-27189: Component/s: Formats (JSON, Avro, Parquet, ORC, SequenceFile) (was: API / Python) > Ability for table api to always add quotes to generated csv > --- > > Key: FLINK-27189 > URL: https://issues.apache.org/jira/browse/FLINK-27189 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: shameet >Priority: Major > > I am using the table api in pyflink to generate a csv . What i noticed is > that its conditionally adding quotes around the data. What I want is quotes > around all the data > csv is being created in s3 > > > e.g in output below the data in last column was not quoted > "[transaction_idgeorge.bl...@reqres.in|mailto:transaction_idgeorge.bl...@reqres.in]",card_hash,transaction_id > "[no3500957594177george.bl...@reqres.in|mailto:no3500957594177george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177 > "[no3500957594178george.bl...@reqres.in|mailto:no3500957594178george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178 > > > I had posted this question in user community and Dian FU suggested i could > create a Jira as this is not supported right now > [https://lists.apache.org/thread/y2g7kjf6ylmqtm2w9b28kfcdborvcgtn] > > > sample code to create a csv in s3 > > def create_source_table(table_name, input_path): > return""" CREATE TABLE \{0} ( > transaction_id VARCHAR(100), > card_hash VARCHAR(100) > ) with ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = '\{1}' > ) """.format( > table_name, input_path) > def create_sink_table(table_name, bucket_name): > return""" CREATE TABLE \{0} ( > transaction_id VARCHAR(100), > card_hash VARCHAR(100), > brand_id VARCHAR(100) > ) > with ( > 'connector'='filesystem', > 'path'='\{1}', > 'format'='csv' > ) """.format( > table_name, bucket_name) > > 2. Creates a source table from a Kinesis Data Stream > table_env.execute_sql( > create_source_table( > input_table_name, input_file > ) > ){color:#00} > {color} > table_env.execute_sql( > create_sink_table( > out_table_name, output_bucket_name > ) > ){color:#00} > {color} > table_env.register_function("addme1", addme1) > {color:#00} {color}{color:#00} > {color}{color:#00} > {color} > source_table = table_env.from_path(input_table_name) > source_table.select(addme1(source_table.transaction_id),source_table.card_hash, > > source_table.transaction_id.alias('brand_id')).execute_insert(out_table_name).wait() > {color:#202020} {color} > > > apache-flink version - 1.13 > python 3.8 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27192) Hi,
Karthik created FLINK-27192: --- Summary: Hi, Key: FLINK-27192 URL: https://issues.apache.org/jira/browse/FLINK-27192 Project: Flink Issue Type: Bug Reporter: Karthik -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27138) flink1.14.0-sql-job submit failed:Flink doesn't support individual window table-valued function TUMBLE
[ https://issues.apache.org/jira/browse/FLINK-27138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520899#comment-17520899 ] yanbiao commented on FLINK-27138: - [~martijnvisser] got it,thanks for your help:) > flink1.14.0-sql-job submit failed:Flink doesn't support individual window > table-valued function TUMBLE > -- > > Key: FLINK-27138 > URL: https://issues.apache.org/jira/browse/FLINK-27138 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission, Table SQL / Client >Affects Versions: 1.14.0 > Environment: CentOS-7 > flink 1.14.0 Release >Reporter: yanbiao >Priority: Critical > Attachments: 3个or正常.png, webUI报错信息.png, 部署目录.png > > > I am working with a standalone deployment of flink-1.14.0-bin-scala_2.11 > the package is downloaded from official website: > [https://flink.apache.org/downloads.html] > then i submit the job to the standalone with rest interface of [ > /jars//run] > in most condition,it works well (where clause is simple or without an *or* > operator) > but when i add some conditions with or operator in where clause, the submit > go with a exception , and even more surprising is that > [where A or B] , [where A or B or C] is OK, > but when or is more than 3 ,like [where A or B or C or D] *is not OK;* > i have tried more situations,they worked diffrent, > and it works well to add a true condition before an or condition,for example: > [where 1=1 and A or B or C] > *I have asked for advice from offical community DingDing Group and wechat > Group,* > *the admin told me that it should be a BUG and suggested starting a issue* > the following message is the stacktrace: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Currently Flink doesn't support individual window > table-valued function TUMBLE(time_col=[ts], size=[10 min]). > Please use window table-valued function with the following computations: > 1. aggregate using window_start and window_end as group keys. > 2. topN using window_start and window_end as partition key. > 3. join with join condition contains window starts equality of input tables > and window ends equality of input tables. > the sql content is : > CREATE TABLE source22 ( > `timestamp` VARCHAR, > `logLevel` VARCHAR, > `threadName` VARCHAR, > `componentId` VARCHAR, > `stackTrace` VARCHAR, > `logType` VARCHAR, > `eventType` VARCHAR, > `subType` VARCHAR, > `operateType` VARCHAR, > `operateTag` VARCHAR, > `weight` INT, > `operator` VARCHAR, > `authRoles` VARCHAR, > `sourceHost` VARCHAR, > `restUri` VARCHAR, > `restMethod` VARCHAR, > `operateObj` VARCHAR, > `operateResult` VARCHAR, > `requestParams` VARCHAR, > `triggerCondition` VARCHAR, > `authType` VARCHAR, > `dataSize` INT, > `exceptionMsg` VARCHAR, > ts as TO_TIMESTAMP(`timestamp`,'-MM-dd HH:mm:ss.SSS'), > WATERMARK FOR ts AS ts - INTERVAL '10'second > ) WITH ( > 'connector' = 'kafka', > 'format' = 'json', > 'properties.bootstrap.servers' = '10.192.78.27:9092', > 'scan.startup.mode' = 'latest-offset', > 'topic' = 'topic22', > 'properties.group.id' = 'groupId_22' > ) > CREATE TABLE sink22 ( > `id` VARCHAR, > `rule_key` VARCHAR, > `rule_name` VARCHAR, > `metric_threshold` INT, > `audit_status` INT, > `audit_comment_num` INT, > `window_start` TIMESTAMP(3), > `window_end` TIMESTAMP(3), > `metric_count` BIGINT, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = 'http://10.192.78.27:39200', > 'index' = 'index22' > ) > INSERT INTO sink22 > SELECT uuid() as id ,'22' as rule_key ,'4 or condition test' as rule_name ,2 > as metric_threshold ,0 as audit_status ,0 as audit_comment_num > ,window_start,window_end ,count(*) as metric_count > FROM TABLE(TUMBLE(TABLE source22, DESCRIPTOR(ts), INTERVAL '10' Second)) > WHERE logType='operation' and (componentId='a' or componentId='b' or > componentId='c' or componentId='d' ) > GROUP BY window_start,window_end > HAVING count(*) >2 > > the main class in the jar is: > public class AuditRuleJob { > public static void main(String[] args) { > final ParameterTool params = ParameterTool.fromArgs(args); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(params); > env.setParallelism(1); > > env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, > 30)); > env.enableCheckpointing(6); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >
[jira] [Assigned] (FLINK-26829) ClassCastException will be thrown when the second operand of divide is a function call
[ https://issues.apache.org/jira/browse/FLINK-26829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-26829: -- Assignee: luoyuxia > ClassCastException will be thrown when the second operand of divide is a > function call > --- > > Key: FLINK-26829 > URL: https://issues.apache.org/jira/browse/FLINK-26829 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.15.0 >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Fix For: 1.16.0 > > > Can be reproduced by add the following code in > SqlExpressionTest#testDivideFunctions > {code:java} > testExpectedSqlException( > "1/POWER(5, 5)", divisorZeroException, classOf[ArithmeticException]) {code} > Then the method ExpressionReducer#skipAndValidateExprs will throw the > exception: > {code:java} > java.lang.ClassCastException: org.apache.calcite.rex.RexCall cannot be cast > to org.apache.calcite.rex.RexLiteral {code} > The following code will cast the DEVIDE's second op to RexLiteral, but it > maybe a function call. > {code:java} > // according to BuiltInFunctionDefinitions, the DEVIDE's second op must be > numeric > assert(RexUtil.isDeterministic(divisionLiteral)) > val divisionComparable = { > > divisionLiteral.asInstanceOf[RexLiteral].getValue.asInstanceOf[Comparable[Any]] > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25242) UDF with primitive int argument does not accept int values even after a not null filter
[ https://issues.apache.org/jira/browse/FLINK-25242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520898#comment-17520898 ] godfrey he commented on FLINK-25242: good catch~ > UDF with primitive int argument does not accept int values even after a not > null filter > --- > > Key: FLINK-25242 > URL: https://issues.apache.org/jira/browse/FLINK-25242 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.15.0 >Reporter: Caizhi Weng >Priority: Major > > Add the following test case to {{TableEnvironmentITCase}} to reproduce this > issue. > {code:scala} > @Test > def myTest(): Unit = { > tEnv.executeSql("CREATE TEMPORARY FUNCTION MyUdf AS > 'org.apache.flink.table.api.MyUdf'") > tEnv.executeSql( > """ > |CREATE TABLE T ( > | a INT > |) WITH ( > | 'connector' = 'values', > | 'bounded' = 'true' > |) > |""".stripMargin) > tEnv.executeSql("SELECT MyUdf(a) FROM T WHERE a IS NOT NULL").print() > } > {code} > UDF code > {code:scala} > class MyUdf extends ScalarFunction { > def eval(a: Int): Int = { > a + 1 > } > } > {code} > Exception stack > {code} > org.apache.flink.table.api.ValidationException: SQL validation failed. > Invalid function call: > default_catalog.default_database.MyUdf(INT) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:168) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:219) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) > at > org.apache.flink.table.api.TableEnvironmentITCase.myTest(TableEnvironmentITCase.scala:97) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >
[GitHub] [flink-ml] weibozhao commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator of Ftrl
weibozhao commented on code in PR #83: URL: https://github.com/apache/flink-ml/pull/83#discussion_r847916030 ## flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LinearModelData.java: ## @@ -37,60 +38,68 @@ import java.io.OutputStream; /** - * Model data of {@link LogisticRegressionModel}. + * Model data of {@link LogisticRegressionModel}, {@link FtrlModel}. * * This class also provides methods to convert model data from Table to Datastream, and classes * to save/load model data. */ -public class LogisticRegressionModelData { +public class LinearModelData { public DenseVector coefficient; +public long modelVersion; Review Comment: We need talk about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator of Ftrl
weibozhao commented on code in PR #83: URL: https://github.com/apache/flink-ml/pull/83#discussion_r847915850 ## flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LinearModelData.java: ## @@ -37,60 +38,68 @@ import java.io.OutputStream; /** - * Model data of {@link LogisticRegressionModel}. + * Model data of {@link LogisticRegressionModel}, {@link FtrlModel}. * * This class also provides methods to convert model data from Table to Datastream, and classes * to save/load model data. */ -public class LogisticRegressionModelData { +public class LinearModelData { Review Comment: LinearModelData is shared by ftrl, lr, svm, linearReg. For these algorithm has the same model data format. We need to talk about this rename. If rename, I agree to move this linear model data to a common place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-26757) change the default value of state.backend.rocksdb.restore-overlap-fraction-threshold
[ https://issues.apache.org/jira/browse/FLINK-26757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-26757. -- Resolution: Fixed merged in master: 4034d3cd6d13e88e2e5ca101510bf333e94a53fa > change the default value of > state.backend.rocksdb.restore-overlap-fraction-threshold > > > Key: FLINK-26757 > URL: https://issues.apache.org/jira/browse/FLINK-26757 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > Attachments: 截屏2022-03-21 上午11.50.28.png > > > `state.backend.rocksdb.restore-overlap-fraction-threshold` is used to control > how to restore a state handle, different thresholds can affect the > performance of restoring. The behavior of deletion in restoring has been > changed after FLINK-21321. > In theory, setting the default value to 0 is most suitable, since > `deleteRange()` takes less time than creating a new RocksDB instance and then > scan-and-put the records. In fact, we also have some experimental data that > the default value of 0 is more suitable. Here is a comparison of > initialization times for different thresholds, we can see that the default > value to 0 takes less time. > !截屏2022-03-21 上午11.50.28.png! > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] Myasuka merged pull request #19324: [FLINK-26757][state] Change the default value of state.backend.rocksdb.restore-overlap-fraction-threshold to 0
Myasuka merged PR #19324: URL: https://github.com/apache/flink/pull/19324 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #19413: [FLINK-16078] [docs-zh] Translate "Tuning Checkpoints and Large State…
Myasuka commented on code in PR #19413: URL: https://github.com/apache/flink/pull/19413#discussion_r847908442 ## docs/content.zh/docs/ops/state/large_state_tuning.md: ## @@ -166,149 +125,101 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { } ``` -## Capacity Planning - -This section discusses how to decide how many resources should be used for a Flink job to run reliably. -The basic rules of thumb for capacity planning are: - - - Normal operation should have enough capacity to not operate under constant *back pressure*. -See [back pressure monitoring]({{< ref "docs/ops/monitoring/back_pressure" >}}) for details on how to check whether the application runs under back pressure. - - - Provision some extra resources on top of the resources needed to run the program back-pressure-free during failure-free time. -These resources are needed to "catch up" with the input data that accumulated during the time the application -was recovering. -How much that should be depends on how long recovery operations usually take (which depends on the size of the state -that needs to be loaded into the new TaskManagers on a failover) and how fast the scenario requires failures to recover. - -*Important*: The base line should to be established with checkpointing activated, because checkpointing ties up -some amount of resources (such as network bandwidth). - - - Temporary back pressure is usually okay, and an essential part of execution flow control during load spikes, -during catch-up phases, or when external systems (that are written to in a sink) exhibit temporary slowdown. +## 容量规划 +本节讨论如何确定 Flink 作业应该使用多少资源才能可靠地运行。 +容量规划的基本经验法则是: + - 正常运行应该有足够的能力在恒定的*反压*下运行。 +如何检查应用程序是否在反压下运行的详细信息,请参阅 [反压监控]({{< ref "docs/ops/monitoring/back_pressure" >}})。 + - 在无故障时间内无反压程序运行所需资源之上提供一些额外的资源。 +需要这些资源来“赶上”在应用程序恢复期间积累的输入数据。 +这通常取决于恢复操作需要多长时间(这取决于需要在故障转移时加载到新 TaskManager 中的状态大小)以及故障恢复的速度。 +*重要*:基线应该在开启 checkpointing 的情况下建立,因为 checkpointing 会占用一些资源(例如网络带宽)。 + - 临时反压通常是可以的,在负载峰值、追赶阶段或外部系统(写入接收器中)出现临时减速时,这是执行流控制的重要部分。 - - Certain operations (like large windows) result in a spiky load for their downstream operators: -In the case of windows, the downstream operators may have little to do while the window is being built, -and have a load to do when the windows are emitted. -The planning for the downstream parallelism needs to take into account how much the windows emit and how -fast such a spike needs to be processed. + - 某些操作(如大窗口)会导致其下游算子的负载激增: +在窗口的情况下,下游算子可能在构建窗口时几乎无事可做,而在窗口发出时有负载要做。 +下游并行度的设置需要考虑到窗口输出多少以及需要以多快的速度处理这种峰值。 -**Important:** In order to allow for adding resources later, make sure to set the *maximum parallelism* of the -data stream program to a reasonable number. The maximum parallelism defines how high you can set the programs -parallelism when re-scaling the program (via a savepoint). +**重要**:为了方便以后添加资源,请务必将数据流程序的*最大并行度*设置为合理的数字。 最大并行度定义了在重新缩放程序时(通过 savepoint )可以设置程序并行度的高度。 -Flink's internal bookkeeping tracks parallel state in the granularity of max-parallelism-many *key groups*. -Flink's design strives to make it efficient to have a very high value for the maximum parallelism, even if -executing the program with a low parallelism. +Flink 的内部以多个*键组(key groups)* 的最大并行度为粒度跟踪并行状态。 +Flink 的设计力求使最大并行度的值达到很高的效率,即使执行程序时并行度很低。 -## Compression - -Flink offers optional compression (default: off) for all checkpoints and savepoints. Currently, compression always uses -the [snappy compression algorithm (version 1.1.4)](https://github.com/xerial/snappy-java) but we are planning to support -custom compression algorithms in the future. Compression works on the granularity of key-groups in keyed state, i.e. -each key-group can be decompressed individually, which is important for rescaling. - -Compression can be activated through the `ExecutionConfig`: +## 压缩 +Flink 为所有 checkpoints 和 savepoints 提供可选的压缩(默认:关闭)。 目前,压缩总是使用 [snappy 压缩算法(版本 1.1.4)](https://github.com/xerial/snappy-java), +但我们计划在未来支持自定义压缩算法。 压缩作用于 keyed state 下 key-groups 的粒度,即每个 key-groups 可以单独解压缩,这对于重新缩放很重要。 +可以通过 `ExecutionConfig` 开启压缩: ```java ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setUseSnapshotCompression(true); ``` -Note The compression option has no impact on incremental snapshots, because they are using RocksDB's internal -format which is always using snappy compression out of the box. - -## Task-Local Recovery +注意 压缩选项对增量快照没有影响,因为它们使用的是 RocksDB 的内部格式,该格式始终使用开箱即用的 snappy 压缩。 -### Motivation +## Task 本地恢复 +### 问题引入 +在 Flink 的 checkpointing 中,每个 task 都会生成其状态快照,然后将其写入分布式存储。 每个 task 通过发送一个描述分布式存储中的位置状态的句柄,向 jobmanager 确认状态的成功写入。 +JobManager 反过来收集所有 tasks 的句柄并将它们捆绑到一个 checkpoint 对象中。 -In Flink's checkpointing, each task produces a snapshot of its state that is then written to a distributed store. Each task acknowledges -a successful write of the state to the job
[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847906550 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -38,6 +38,8 @@ under the License. 在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。 + + # 可用的 State Backends Review Comment: I check this page again and find other three place with same problem, all of those been fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on PR #19252: URL: https://github.com/apache/flink/pull/19252#issuecomment-1095911460 Thank for your review again @masteryhx , I hive fixed problems you mentioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847905580 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -306,77 +329,75 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} -## Enabling Changelog + -{{< hint warning >}} This feature is in experimental status. {{< /hint >}} +## 开启 Changelog -{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} +{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}} -### Introduction +{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}} -Changelog is a feature that aims to decrease checkpointing time and, therefore, end-to-end latency in exactly-once mode. + -Most commonly, checkpoint duration is affected by: +### 介绍 -1. Barrier travel time and alignment, addressed by - [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) - and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) -2. Snapshot creation time (so-called synchronous phase), addressed by asynchronous snapshots (mentioned [above]({{< - ref "#the-embeddedrocksdbstatebackend">}})) -4. Snapshot upload time (asynchronous phase) +Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以减少 exactly-once 模式下的端到端延迟。 -Upload time can be decreased by [incremental checkpoints]({{< ref "#incremental-checkpoints" >}}). -However, most incremental state backends perform some form of compaction periodically, which results in re-uploading the -old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of -data tends to be very high in every checkpoint. +一般情况下 checkpoint 持续时间受如下因素影响: -With Changelog enabled, Flink uploads state changes continuously and forms a changelog. On checkpoint, only the relevant -part of this changelog needs to be uploaded. The configured state backend is snapshotted in the -background periodically. Upon successful upload, the changelog is truncated. +1. Barrier 到达和对齐时间,可以通过 [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) 解决。 Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #19400: [FLINK-25142][Connectors / Hive]Fix user-defined hive udtf initialize exception in hive dialect
luoyuxia commented on PR #19400: URL: https://github.com/apache/flink/pull/19400#issuecomment-1095903340 Thanks @Tartarus0zm for contribution. It's a continue work of #17990. LGTM. @beyond1920 @wuchong Could you please help review & merge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27191) Support multi kerberos-enabled Hive clusters
[ https://issues.apache.org/jira/browse/FLINK-27191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-27191: - Component/s: Connectors / Hive > Support multi kerberos-enabled Hive clusters > - > > Key: FLINK-27191 > URL: https://issues.apache.org/jira/browse/FLINK-27191 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Fix For: 1.16.0 > > > Currently, to accesse kerberos-enabled Hive cluster, users are expected to > add ker/secret in flink-conf. But it can only access one Hive cluster in one > Flink cluster, we are also expected to support multi kerberos-enabled Hive > clusters in one Flink cluster. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27191) Support multi kerberos-enabled Hive clusters
luoyuxia created FLINK-27191: Summary: Support multi kerberos-enabled Hive clusters Key: FLINK-27191 URL: https://issues.apache.org/jira/browse/FLINK-27191 Project: Flink Issue Type: Improvement Reporter: luoyuxia Fix For: 1.16.0 Currently, to accesse kerberos-enabled Hive cluster, users are expected to add ker/secret in flink-conf. But it can only access one Hive cluster in one Flink cluster, we are also expected to support multi kerberos-enabled Hive clusters in one Flink cluster. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] luoyuxia commented on pull request #19423: [FLINK-27175][hive] Support to call Hive UDAF when the UDAF accepts one parameter with array type
luoyuxia commented on PR #19423: URL: https://github.com/apache/flink/pull/19423#issuecomment-1095873524 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19429: [hotfix][docs] Fix the "capacity-planning" page of "Tuning Checkpoint…
flinkbot commented on PR #19429: URL: https://github.com/apache/flink/pull/19429#issuecomment-1095867338 ## CI report: * 9e092e3dfca1ea6effdf2edd26acdf9c590d5ac6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wenlong88 commented on pull request #19179: [FLINK-26756][table-planner] Fix the deserialization error for match recognize
wenlong88 commented on PR #19179: URL: https://github.com/apache/flink/pull/19179#issuecomment-1095863004 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuzhuang2017 opened a new pull request, #19429: [hotfix][docs] Fix the "capacity-planning" page of "Tuning Checkpoint…
liuzhuang2017 opened a new pull request, #19429: URL: https://github.com/apache/flink/pull/19429 Fix the "capacity-planning" page of "Tuning Checkpoints and Large State" ## What is the purpose of the change Fix the "capacity-planning" page of "Tuning Checkpoints and Large State" ## Brief change log Fix the "capacity-planning" page of "Tuning Checkpoints and Large State" ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #87: [FLINK-27123] Introduce filter predicate for 'LIKE'
LadyForest commented on code in PR #87: URL: https://github.com/apache/flink-table-store/pull/87#discussion_r847890543 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java: ## @@ -95,9 +102,46 @@ public Predicate visit(CallExpression call) { .map(FieldReferenceExpression::getFieldIndex) .map(IsNotNull::new) .orElseThrow(UnsupportedExpression::new); +} else if (func == BuiltInFunctionDefinitions.LIKE) { +FieldReferenceExpression fieldRefExpr = + extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); +if (fieldRefExpr.getOutputDataType().getLogicalType().getTypeRoot() +== LogicalTypeRoot.VARCHAR) { +String sqlPattern = +extractLiteral(fieldRefExpr.getOutputDataType(), children.get(1)) +.orElseThrow(UnsupportedExpression::new) +.value() +.toString(); +String escape = +children.size() <= 2 +? null +: extractLiteral(fieldRefExpr.getOutputDataType(), children.get(2)) + .orElseThrow(UnsupportedExpression::new) +.value() +.toString(); +if (escape == null) { +Matcher matcher = BEGIN_PATTERN.matcher(sqlPattern); +if (matcher.matches()) { +return new StartsWith( +fieldRefExpr.getFieldIndex(), +matcher.group(1), +sqlPattern.endsWith("_")); Review Comment: > `sqlPattern.endsWith("_")` what it is mean? match one or more characters -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847887144 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -306,77 +329,75 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} -## Enabling Changelog + -{{< hint warning >}} This feature is in experimental status. {{< /hint >}} +## 开启 Changelog -{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} +{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}} -### Introduction +{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}} Review Comment: Got it , thanks, I will changed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847885936 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -38,6 +38,8 @@ under the License. 在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。 + + # 可用的 State Backends Review Comment: Got it ! Thanks for your patience in explaining @masteryhx , I will fix it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
masteryhx commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847882156 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -306,77 +329,75 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} -## Enabling Changelog + -{{< hint warning >}} This feature is in experimental status. {{< /hint >}} +## 开启 Changelog -{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} +{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}} -### Introduction +{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}} Review Comment: I think it could be changed into "您" in this place where the sentence is a warning as you could see in the [Flink Translation Specifications](https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
masteryhx commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847877301 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -38,6 +38,8 @@ under the License. 在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。 + + # 可用的 State Backends Review Comment: The directory shown in right is inconsistent, you could check it running the doc server. We need to replace "#" with "##" so the directory could be shown correctly. I think "选择合适的 State Backend" and "RocksDB State Backend 进阶" have similar problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27018) timestamp missing end zero when outputing to kafka
[ https://issues.apache.org/jira/browse/FLINK-27018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520882#comment-17520882 ] Yao Zhang commented on FLINK-27018: --- Hi [~fpaul] and [~jeff-zou] , I reproduced this issue with Flink 1.14.2. After debugging I found this problem lies in SQL_TIMESTAMP_FORMAT. Could you please assign this issue to me? > timestamp missing end zero when outputing to kafka > --- > > Key: FLINK-27018 > URL: https://issues.apache.org/jira/browse/FLINK-27018 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.5 >Reporter: jeff-zou >Priority: Major > Attachments: kafka.png > > > the bug is described as follows: > > {code:java} > data in source: > 2022-04-02 03:34:21.260 > but after sink by sql, data in kafka: > 2022-04-02 03:34:21.26 > {code} > > data miss end zero in kafka. > > sql: > {code:java} > create kafka_table(stime stimestamp) with ('connector'='kafka','format' = > 'json'); > insert into kafka_table select stime from (values(timestamp '2022-04-02 > 03:34:21.260')){code} > the value in kafka is : \{"stime":"2022-04-02 03:34:21.26"}, missed end zero. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #87: [FLINK-27123] Introduce filter predicate for 'LIKE'
JingsongLi commented on code in PR #87: URL: https://github.com/apache/flink-table-store/pull/87#discussion_r847865264 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java: ## @@ -95,9 +102,46 @@ public Predicate visit(CallExpression call) { .map(FieldReferenceExpression::getFieldIndex) .map(IsNotNull::new) .orElseThrow(UnsupportedExpression::new); +} else if (func == BuiltInFunctionDefinitions.LIKE) { +FieldReferenceExpression fieldRefExpr = + extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); +if (fieldRefExpr.getOutputDataType().getLogicalType().getTypeRoot() +== LogicalTypeRoot.VARCHAR) { Review Comment: Family is `LogicalTypeFamily.CHARACTER_STRING`. Char also is OK. ## flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java: ## @@ -1224,6 +1226,144 @@ public void testQueryContainsDefaultFieldName() throws Exception { .containsOnly(changelogRow("+I", 1, "abc")); } +@Test +public void testLike() throws Exception { +rootPath = TEMPORARY_FOLDER.newFolder().getPath(); +tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode()); +List input = +Arrays.asList( +changelogRow("+I", 1, "test_1"), +changelogRow("+I", 2, "test_2"), +changelogRow("+I", 1, "test_%"), +changelogRow("+I", 2, "test%2"), +changelogRow("+I", 3, "university"), +changelogRow("+I", 4, "very"), +changelogRow("+I", 5, "yield")); +String id = registerData(input); +UUID randomSourceId = UUID.randomUUID(); +tEnv.executeSql( +String.format( +"create table `helper_source_%s` (f0 int, f1 string) with (" ++ "'connector' = 'values', " ++ "'bounded' = 'true', " ++ "'data-id' = '%s')", +randomSourceId, id)); + +UUID randomSinkId = UUID.randomUUID(); +tEnv.executeSql( +String.format( +"create table `managed_table_%s` with (" ++ "'%s' = '%s'" ++ ") like `helper_source_%s` " ++ "(excluding options)", +randomSinkId, FileStoreOptions.PATH.key(), rootPath, randomSourceId)); +tEnv.executeSql( Review Comment: To test the filter, the best way is to insert it multiple times so that multiple files are generated to achieve the effect we want to filter the files. ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java: ## @@ -95,9 +102,46 @@ public Predicate visit(CallExpression call) { .map(FieldReferenceExpression::getFieldIndex) .map(IsNotNull::new) .orElseThrow(UnsupportedExpression::new); +} else if (func == BuiltInFunctionDefinitions.LIKE) { +FieldReferenceExpression fieldRefExpr = + extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); +if (fieldRefExpr.getOutputDataType().getLogicalType().getTypeRoot() +== LogicalTypeRoot.VARCHAR) { +String sqlPattern = +extractLiteral(fieldRefExpr.getOutputDataType(), children.get(1)) +.orElseThrow(UnsupportedExpression::new) +.value() +.toString(); +String escape = +children.size() <= 2 +? null +: extractLiteral(fieldRefExpr.getOutputDataType(), children.get(2)) + .orElseThrow(UnsupportedExpression::new) +.value() +.toString(); +if (escape == null) { +Matcher matcher = BEGIN_PATTERN.matcher(sqlPattern); +if (matcher.matches()) { +return new StartsWith( +fieldRefExpr.getFieldIndex(), +matcher.group(1), +sqlPattern.endsWith("_")); Review Comment: `sqlPattern.endsWith("_")` what it is mean? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL
[GitHub] [flink] beyond1920 closed pull request #19109: [FLINK-26681][hive] Support sql statement end with ";" for Hive dialect query
beyond1920 closed pull request #19109: [FLINK-26681][hive] Support sql statement end with ";" for Hive dialect query URL: https://github.com/apache/flink/pull/19109 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator of Ftrl
yunfengzhou-hub commented on code in PR #83: URL: https://github.com/apache/flink-ml/pull/83#discussion_r847844186 ## flink-ml-lib/src/main/java/org/apache/flink/ml/classification/ftrl/Ftrl.java: ## @@ -0,0 +1,395 @@ +/* Review Comment: Jira tickets for algorithms that are supposed to be added have all been created in advance. You can find the ticket for FTRL on https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=541. The ticket for FTRL is FLINK-20790. ## flink-ml-lib/src/test/java/org/apache/flink/ml/classification/FtrlTest.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.classification.ftrl.Ftrl; +import org.apache.flink.ml.classification.ftrl.FtrlModel; +import org.apache.flink.ml.classification.logisticregression.LogisticRegression; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertEquals; + +/** Tests {@link Ftrl} and {@link FtrlModel}. */ +public class FtrlTest extends AbstractTestBase { Review Comment: The test cases are arranged differently from existing practice. Let's add tests that each covers the following situations. - tests getting/setting parameters - tests the most common fit/transform process. - tests save/load. - tests getting/setting model data. - tests invalid inputs/corner cases. ## flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LinearModelData.java: ## @@ -37,60 +38,68 @@ import java.io.OutputStream; /** - * Model data of {@link LogisticRegressionModel}. + * Model data of {@link LogisticRegressionModel}, {@link FtrlModel}. * * This class also provides methods to convert model data from Table to Datastream, and classes * to save/load model data. */ -public class LogisticRegressionModelData { +public class LinearModelData { Review Comment: Could you please illustrate the relationship between FTRL and LogisticRegression, and other algorithms like LinearRegression? I'm not sure why we would like to rename `LogisticRegressionModelData` as `LinearModelData`. If after discussion we still agree that this renaming is reasonable, it would mean that the model data class neither belongs `logisticregresson` or `ftrl` package. We would need to place classes like this to a neutral package, like something named `common`. ## flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LinearModelData.java: ## @@ -37,60 +38,68 @@ import java.io.OutputStream; /** - * Model data of {@link LogisticRegressionModel}. + * Model data of {@link LogisticRegressionModel}, {@link FtrlModel}. * * This class also provides methods to convert model data from Table to Datastream, and
[jira] [Commented] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is
[ https://issues.apache.org/jira/browse/FLINK-27187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520859#comment-17520859 ] Feifan Wang commented on FLINK-27187: - I think it's ok. Should I open a new ticket do this work ? Or update this ticket directly ? > The attemptsPerUpload metric may be lower than it actually is > - > > Key: FLINK-27187 > URL: https://issues.apache.org/jira/browse/FLINK-27187 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Feifan Wang >Priority: Minor > > The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate > distributions of number of attempts per upload. > In the current implementation, each successful attempt try to update > attemptsPerUpload with its attemptNumber. > But consider this case: > # attempt 1 timeout, then schedule attempt 2 > # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1 > In fact there are two attempts, but attemptsPerUpload updated with 1. > So, I think we should add "actionAttemptsCount" to > RetryExecutor.RetriableActionAttempt, this field shared across all attempts > to execute the same upload action representing the number of upload attempts. > And completed attempt should use this field update attemptsPerUpload. > > How do you think about ? [~ym] , [~roman] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is
[ https://issues.apache.org/jira/browse/FLINK-27187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520845#comment-17520845 ] Roman Khachatryan commented on FLINK-27187: --- How about "totalAttemptsPerUpload"? > The attemptsPerUpload metric may be lower than it actually is > - > > Key: FLINK-27187 > URL: https://issues.apache.org/jira/browse/FLINK-27187 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Feifan Wang >Priority: Minor > > The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate > distributions of number of attempts per upload. > In the current implementation, each successful attempt try to update > attemptsPerUpload with its attemptNumber. > But consider this case: > # attempt 1 timeout, then schedule attempt 2 > # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1 > In fact there are two attempts, but attemptsPerUpload updated with 1. > So, I think we should add "actionAttemptsCount" to > RetryExecutor.RetriableActionAttempt, this field shared across all attempts > to execute the same upload action representing the number of upload attempts. > And completed attempt should use this field update attemptsPerUpload. > > How do you think about ? [~ym] , [~roman] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is
[ https://issues.apache.org/jira/browse/FLINK-27187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520836#comment-17520836 ] Feifan Wang commented on FLINK-27187: - [~roman] , how would you like to name the metric of "the total number of attempts" ? > The attemptsPerUpload metric may be lower than it actually is > - > > Key: FLINK-27187 > URL: https://issues.apache.org/jira/browse/FLINK-27187 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Feifan Wang >Priority: Minor > > The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate > distributions of number of attempts per upload. > In the current implementation, each successful attempt try to update > attemptsPerUpload with its attemptNumber. > But consider this case: > # attempt 1 timeout, then schedule attempt 2 > # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1 > In fact there are two attempts, but attemptsPerUpload updated with 1. > So, I think we should add "actionAttemptsCount" to > RetryExecutor.RetriableActionAttempt, this field shared across all attempts > to execute the same upload action representing the number of upload attempts. > And completed attempt should use this field update attemptsPerUpload. > > How do you think about ? [~ym] , [~roman] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is
[ https://issues.apache.org/jira/browse/FLINK-27187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520833#comment-17520833 ] Roman Khachatryan commented on FLINK-27187: --- The number of the successfull attempt is actually the intended meaning of the metric. But a metric for the total number of attempts can also be added. > The attemptsPerUpload metric may be lower than it actually is > - > > Key: FLINK-27187 > URL: https://issues.apache.org/jira/browse/FLINK-27187 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Feifan Wang >Priority: Minor > > The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate > distributions of number of attempts per upload. > In the current implementation, each successful attempt try to update > attemptsPerUpload with its attemptNumber. > But consider this case: > # attempt 1 timeout, then schedule attempt 2 > # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1 > In fact there are two attempts, but attemptsPerUpload updated with 1. > So, I think we should add "actionAttemptsCount" to > RetryExecutor.RetriableActionAttempt, this field shared across all attempts > to execute the same upload action representing the number of upload attempts. > And completed attempt should use this field update attemptsPerUpload. > > How do you think about ? [~ym] , [~roman] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520830#comment-17520830 ] Roman Khachatryan commented on FLINK-27132: --- Sorry, I mean the 5th item. And yes, I mean ZK or K8s or any other handle store. > AFAIU, the {{SharedStateRegistry}} only loads the data from > {{{}CompletedCheckpointStore{}}}, i.e. if the data is not present through > these checkpoints, it won't be discarded. Yes, but the (new) checkpoints that are still present there, might still refer to the original files under shared/ folder; and these files therefore might be discarded. > CheckpointResourcesCleanupRunner might discard shared state of the initial > checkpoint > - > > Key: FLINK-27132 > URL: https://issues.apache.org/jira/browse/FLINK-27132 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Roman Khachatryan >Priority: Major > > When considering the following case: # A job starts from a checkpoint in > NO_CLAIM mode, with incremental checkpoints enabled > # It produces some new checkpoints and subsumes the original one (not > discarding shared state - before FLINK-24611 or after FLINK-26985) > # Job terminates abruptly > # The cleaner is started for that job > # ZK doesn't have the initial checkpoint, so the store will load only the > new checkpoints (created in 2). Shared state is registered > # The store is shut down - discarding all the checkpoints and also any > shared state > In -6- 5, if some checkpoint uses the initial state, it will also be discarded > > [~mapohl] could you please confirm this? > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22
[ https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-26070: --- Labels: pull-request-available stale-major (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 60 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Update dependency numpy to >=1.21.0,<1.22 > - > > Key: FLINK-26070 > URL: https://issues.apache.org/jira/browse/FLINK-26070 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Reporter: Martijn Visser >Priority: Major > Labels: pull-request-available, stale-major > > PyFlink currently sets the required numpy version as >=1.14.3,<1.2 > We should update this to >=1.21.0,<1.22 > Updating numpy will also resolve being marked as vulnerable for > https://nvd.nist.gov/vuln/detail/CVE-2021-33430 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25962) Flink generated Avro schemas can't be parsed using Python
[ https://issues.apache.org/jira/browse/FLINK-25962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-25962: --- Labels: pull-request-available stale-major (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 60 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Flink generated Avro schemas can't be parsed using Python > - > > Key: FLINK-25962 > URL: https://issues.apache.org/jira/browse/FLINK-25962 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.3 >Reporter: Ryan Skraba >Priority: Major > Labels: pull-request-available, stale-major > > Flink currently generates Avro schemas as records with the top-level name > {{"record"}} > Unfortunately, there is some inconsistency between Avro implementations in > different languages that may prevent this record from being read, notably > Python, which generates the error: > *avro.schema.SchemaParseException: record is a reserved type name* > (See the comment on FLINK-18096 for the full stack trace). > The Java SDK accepts this name, and there's an [ongoing > discussion|https://lists.apache.org/thread/0wmgyx6z69gy07lvj9ndko75752b8cn2] > about what the expected behaviour should be. This should be clarified and > fixed in Avro, of course. > Regardless of the resolution, the best practice (which is used almost > everywhere else in the Flink codebase) is to explicitly specify a top-level > namespace for an Avro record. We should use a default like: > {{{}org.apache.flink.avro.generated{}}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-21383) Docker image does not play well together with ConfigMap based flink-conf.yamls
[ https://issues.apache.org/jira/browse/FLINK-21383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-21383: --- Labels: auto-deprioritized-major usability (was: stale-major usability) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Docker image does not play well together with ConfigMap based flink-conf.yamls > -- > > Key: FLINK-21383 > URL: https://issues.apache.org/jira/browse/FLINK-21383 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, flink-docker >Affects Versions: 1.11.6, 1.12.7, 1.13.5, 1.14.3 >Reporter: Till Rohrmann >Priority: Minor > Labels: auto-deprioritized-major, usability > > Flink's Docker image does not play well together with ConfigMap based > flink-conf.yamls. The {{docker-entrypoint.sh}} script offers a few env > variables to overwrite configuration values (e.g. {{FLINK_PROPERTIES}}, > {{JOB_MANAGER_RPC_ADDRESS}}, etc.). The problem is that the entrypoint script > assumes that it can modify the existing {{flink-conf.yaml}}. This is not the > case if the {{flink-conf.yaml}} is based on a {{ConfigMap}}. > Making things worse, failures updating the {{flink-conf.yaml}} are not > reported. Moreover, the called {{jobmanager.sh}} and {{taskmanager.sh}} > scripts don't support to pass in dynamic configuration properties into the > processes. > I think the problem is that our assumption that we can modify the > {{flink-conf.yaml}} does not always hold true. If we updated the final > configuration from within the Flink process (dynamic properties and env > variables), then this problem could be avoided. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-27132: -- Description: When considering the following case: # A job starts from a checkpoint in NO_CLAIM mode, with incremental checkpoints enabled # It produces some new checkpoints and subsumes the original one (not discarding shared state - before FLINK-24611 or after FLINK-26985) # Job terminates abruptly # The cleaner is started for that job # ZK doesn't have the initial checkpoint, so the store will load only the new checkpoints (created in 2). Shared state is registered # The store is shut down - discarding all the checkpoints and also any shared state In -6- 5, if some checkpoint uses the initial state, it will also be discarded [~mapohl] could you please confirm this? cc: [~yunta] was: When considering the following case: # A job starts from a checkpoint in NO_CLAIM mode, with incremental checkpoints enabled # It produces some new checkpoints and subsumes the original one (not discarding shared state - before FLINK-24611 or after FLINK-26985) # Job terminates abruptly # The cleaner is started for that job # ZK doesn't have the initial checkpoint, so the store will load only the new checkpoints (created in 2). Shared state is registered # The store is shut down - discarding all the checkpoints and also any shared state In -6-5, if some checkpoint uses the initial state, it will also be discarded [~mapohl] could you please confirm this? cc: [~yunta] > CheckpointResourcesCleanupRunner might discard shared state of the initial > checkpoint > - > > Key: FLINK-27132 > URL: https://issues.apache.org/jira/browse/FLINK-27132 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Roman Khachatryan >Priority: Major > > When considering the following case: # A job starts from a checkpoint in > NO_CLAIM mode, with incremental checkpoints enabled > # It produces some new checkpoints and subsumes the original one (not > discarding shared state - before FLINK-24611 or after FLINK-26985) > # Job terminates abruptly > # The cleaner is started for that job > # ZK doesn't have the initial checkpoint, so the store will load only the > new checkpoints (created in 2). Shared state is registered > # The store is shut down - discarding all the checkpoints and also any > shared state > In -6- 5, if some checkpoint uses the initial state, it will also be discarded > > [~mapohl] could you please confirm this? > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-27132: -- Description: When considering the following case: # A job starts from a checkpoint in NO_CLAIM mode, with incremental checkpoints enabled # It produces some new checkpoints and subsumes the original one (not discarding shared state - before FLINK-24611 or after FLINK-26985) # Job terminates abruptly # The cleaner is started for that job # ZK doesn't have the initial checkpoint, so the store will load only the new checkpoints (created in 2). Shared state is registered # The store is shut down - discarding all the checkpoints and also any shared state In 5, if some checkpoint uses the initial state, it will also be discarded [~mapohl] could you please confirm this? cc: [~yunta] was: When considering the following case: # A job starts from a checkpoint in NO_CLAIM mode, with incremental checkpoints enabled # It produces some new checkpoints and subsumes the original one (not discarding shared state - before FLINK-24611 or after FLINK-26985) # Job terminates abruptly # The cleaner is started for that job # ZK doesn't have the initial checkpoint, so the store will load only the new checkpoints (created in 2). Shared state is registered # The store is shut down - discarding all the checkpoints and also any shared state In 6, if some checkpoint uses the initial state, it will also be discarded [~mapohl] could you please confirm this? cc: [~yunta] > CheckpointResourcesCleanupRunner might discard shared state of the initial > checkpoint > - > > Key: FLINK-27132 > URL: https://issues.apache.org/jira/browse/FLINK-27132 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Roman Khachatryan >Priority: Major > > When considering the following case: # A job starts from a checkpoint in > NO_CLAIM mode, with incremental checkpoints enabled > # It produces some new checkpoints and subsumes the original one (not > discarding shared state - before FLINK-24611 or after FLINK-26985) > # Job terminates abruptly > # The cleaner is started for that job > # ZK doesn't have the initial checkpoint, so the store will load only the > new checkpoints (created in 2). Shared state is registered > # The store is shut down - discarding all the checkpoints and also any > shared state > > In 5, if some checkpoint uses the initial state, it will also be discarded > > [~mapohl] could you please confirm this? > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-27132: -- Description: When considering the following case: # A job starts from a checkpoint in NO_CLAIM mode, with incremental checkpoints enabled # It produces some new checkpoints and subsumes the original one (not discarding shared state - before FLINK-24611 or after FLINK-26985) # Job terminates abruptly # The cleaner is started for that job # ZK doesn't have the initial checkpoint, so the store will load only the new checkpoints (created in 2). Shared state is registered # The store is shut down - discarding all the checkpoints and also any shared state In -6-5, if some checkpoint uses the initial state, it will also be discarded [~mapohl] could you please confirm this? cc: [~yunta] was: When considering the following case: # A job starts from a checkpoint in NO_CLAIM mode, with incremental checkpoints enabled # It produces some new checkpoints and subsumes the original one (not discarding shared state - before FLINK-24611 or after FLINK-26985) # Job terminates abruptly # The cleaner is started for that job # ZK doesn't have the initial checkpoint, so the store will load only the new checkpoints (created in 2). Shared state is registered # The store is shut down - discarding all the checkpoints and also any shared state In 5, if some checkpoint uses the initial state, it will also be discarded [~mapohl] could you please confirm this? cc: [~yunta] > CheckpointResourcesCleanupRunner might discard shared state of the initial > checkpoint > - > > Key: FLINK-27132 > URL: https://issues.apache.org/jira/browse/FLINK-27132 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Roman Khachatryan >Priority: Major > > When considering the following case: # A job starts from a checkpoint in > NO_CLAIM mode, with incremental checkpoints enabled > # It produces some new checkpoints and subsumes the original one (not > discarding shared state - before FLINK-24611 or after FLINK-26985) > # Job terminates abruptly > # The cleaner is started for that job > # ZK doesn't have the initial checkpoint, so the store will load only the > new checkpoints (created in 2). Shared state is registered > # The store is shut down - discarding all the checkpoints and also any > shared state > > In -6-5, if some checkpoint uses the initial state, it will also be discarded > > [~mapohl] could you please confirm this? > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26811) Document CRD upgrade process
[ https://issues.apache.org/jira/browse/FLINK-26811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520813#comment-17520813 ] Ted Chang commented on FLINK-26811: --- [~wangyang0918] good reference. I will add that. > Document CRD upgrade process > > > Key: FLINK-26811 > URL: https://issues.apache.org/jira/browse/FLINK-26811 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Ted Chang >Priority: Major > Fix For: kubernetes-operator-0.1.0 > > > We need to document how to update the CRD with a newer version. During > development, we delete the old CRD and create it from scratch. In an > environment with existing deployments that isn't possible, as deleting the > CRD would wipe out all existing CRs. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27101) Periodically break the chain of incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520811#comment-17520811 ] Steven Zhen Wu commented on FLINK-27101: [~yunta] a pluggable checkpoint trigger policy might work well here. If I understand you correctly, you are saying we can switch the triggered checkpiont from incremental to full in a customized policy based on some cron conditions. triggering a savpoint is not a concern. it can be scheduled by the control plane. If we want to use the savepoint to break the incremental chain, we would need to redeploy the job from the savepoint. the redeployment could feel like hacky. > Periodically break the chain of incremental checkpoint > -- > > Key: FLINK-27101 > URL: https://issues.apache.org/jira/browse/FLINK-27101 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing >Reporter: Steven Zhen Wu >Priority: Major > > Incremental checkpoint is almost a must for large-state jobs. It greatly > reduces the bytes uploaded to DFS per checkpoint. However, there are a few > implications from incremental checkpoint that are problematic for production > operations. Will use S3 as an example DFS for the rest of description. > 1. Because there is no way to deterministically know how far back the > incremental checkpoint can refer to files uploaded to S3, it is very > difficult to set S3 bucket/object TTL. In one application, we have observed > Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can > corrupt the Flink checkpoints. > S3 TTL is important for a few reasons > - purge orphaned files (like external checkpoints from previous deployments) > to keep the storage cost in check. This problem can be addressed by > implementing proper garbage collection (similar to JVM) by traversing the > retained checkpoints from all jobs and traverse the file references. But that > is an expensive solution from engineering cost perspective. > - Security and privacy. E.g., there may be requirement that Flink state can't > keep the data for more than some duration threshold (hours/days/weeks). > Application is expected to purge keys to satisfy the requirement. However, > with incremental checkpoint and how deletion works in RocksDB, it is hard to > set S3 TTL to purge S3 files. Even though those old S3 files don't contain > live keys, they may still be referrenced by retained Flink checkpoints. > 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a > result, restoring from checkpoint failed. With incremental checkpoint, it > usually doesn't help to try other older checkpoints, because they may refer > to the same corrupted file. It is unclear whether the corruption happened > before or during S3 upload. This risk can be mitigated with periodical > savepoints. > It all boils down to periodical full snapshot (checkpoint or savepoint) to > deterministically break the chain of incremental checkpoints. Search the jira > history, the behavior that FLINK-23949 [1] trying to fix is actually close to > what we would need here. > There are a few options > 1. Periodically trigger savepoints (via control plane). This is actually not > a bad practice and might be appealing to some people. The problem is that it > requires a job deployment to break the chain of incremental checkpoint. > periodical job deployment may sound hacky. If we make the behavior of full > checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be > an acceptable compromise. The benefit is that no job deployment is required > after savepoints. > 2. Build the feature in Flink incremental checkpoint. Periodically (with some > cron style config) trigger a full checkpoint to break the incremental chain. > If the full checkpoint failed (due to whatever reason), the following > checkpoints should attempt full checkpoint as well until one successful full > checkpoint is completed. > 3. For the security/privacy requirement, the main thing is to apply > compaction on the deleted keys. That could probably avoid references to the > old files. Is there any RocksDB compation can achieve full compaction of > removing old delete markers. Recent delete markers are fine > [1] https://issues.apache.org/jira/browse/FLINK-23949 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27190) Revisit error handling in main reconcile() loop
Matyas Orhidi created FLINK-27190: - Summary: Revisit error handling in main reconcile() loop Key: FLINK-27190 URL: https://issues.apache.org/jira/browse/FLINK-27190 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Matyas Orhidi Fix For: kubernetes-operator-1.0.0 The are some improvements introduced around error handling: * [https://github.com/java-operator-sdk/java-operator-sdk/pull/1033] in the upcoming java-operator-sdk release [v3.0.0.RC1.|https://github.com/java-operator-sdk/java-operator-sdk/releases/tag] We should revisit and simplify further the error logic in {{FlinkDeploymentController.reconcile()}} {{Currently}} * checked exceptions are wrapped in runtime exceptions * validation errors are terminal errors but handled with differently -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot commented on pull request #19428: [FLINK-25694] [file-system] Ugrade Presto
flinkbot commented on PR #19428: URL: https://github.com/apache/flink/pull/19428#issuecomment-1095522423 ## CI report: * 8cb988ee3e60ba740733ece9b96bde1c86d5bef4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25694) GSON/Alluxio Vulnerability
[ https://issues.apache.org/jira/browse/FLINK-25694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520798#comment-17520798 ] David Perkins commented on FLINK-25694: --- A new version was finally released. Created a PR with the update: https://github.com/apache/flink/pull/19428 > GSON/Alluxio Vulnerability > -- > > Key: FLINK-25694 > URL: https://issues.apache.org/jira/browse/FLINK-25694 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.14.2 >Reporter: David Perkins >Priority: Major > Labels: pull-request-available > > GSON has a bug, which was fixed in 2.8.9, see > [https://github.com/google/gson/pull/1991|https://github.com/google/gson/pull/1991.] > This results in the possibility for DOS attacks. > GSON is included in the `flink-s3-fs-presto` plugin, because Alluxio includes > it in their shaded client. I've opened an issue in Alluxio: > [https://github.com/Alluxio/alluxio/issues/14868]. When that is fixed, the > plugin also needs to be updated. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25694) GSON/Alluxio Vulnerability
[ https://issues.apache.org/jira/browse/FLINK-25694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-25694: --- Labels: pull-request-available (was: ) > GSON/Alluxio Vulnerability > -- > > Key: FLINK-25694 > URL: https://issues.apache.org/jira/browse/FLINK-25694 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.14.2 >Reporter: David Perkins >Priority: Major > Labels: pull-request-available > > GSON has a bug, which was fixed in 2.8.9, see > [https://github.com/google/gson/pull/1991|https://github.com/google/gson/pull/1991.] > This results in the possibility for DOS attacks. > GSON is included in the `flink-s3-fs-presto` plugin, because Alluxio includes > it in their shaded client. I've opened an issue in Alluxio: > [https://github.com/Alluxio/alluxio/issues/14868]. When that is fixed, the > plugin also needs to be updated. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] David-N-Perkins opened a new pull request, #19428: [FLINK-25694] [file-system] Ugrade Presto
David-N-Perkins opened a new pull request, #19428: URL: https://github.com/apache/flink/pull/19428 ## What is the purpose of the change * Updated presto to the latest version due to GSON bug ## Brief change log - Updated prosto library to `.272` ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: yes ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor
flinkbot commented on PR #19427: URL: https://github.com/apache/flink/pull/19427#issuecomment-1095484285 ## CI report: * 05707cf8955f190d65021d61c5afd8164e831315 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27140) Move JobResultStore dirty entry creation into ioExecutor
[ https://issues.apache.org/jira/browse/FLINK-27140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27140: --- Labels: pull-request-available (was: ) > Move JobResultStore dirty entry creation into ioExecutor > > > Key: FLINK-27140 > URL: https://issues.apache.org/jira/browse/FLINK-27140 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.15.0, 1.16.0 >Reporter: Matthias Pohl >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > The {{FileSystemJobResultStore}} is thread-safe and, therefore, we can move > the dirty entry creation into the {{ioExecutor}} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] zentol opened a new pull request, #19427: [FLINK-27140][coordination] Write job result in ioExecutor
zentol opened a new pull request, #19427: URL: https://github.com/apache/flink/pull/19427 Prevents the main thread from doing a potentially long running operation, moving it instead to the io executor. Needed to adjust some tests to rely on the runner termination future instead of the job result future, as the latter does not guarantee that the runner is cleaned up, which is required for these tests to pass because the mocked JMRunner doesn't implement all required methods (and if the runner is terminated then the dispatcher handles everything). This was previously fine because the job execution, job termination and runner termination happened synchronously by virtue of all futures already being completed and us never switching executors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26140) Add basic handling mechanism to deal with job upgrade errors
[ https://issues.apache.org/jira/browse/FLINK-26140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-26140: --- Labels: pull-request-available (was: ) > Add basic handling mechanism to deal with job upgrade errors > > > Key: FLINK-26140 > URL: https://issues.apache.org/jira/browse/FLINK-26140 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > There are various different ways how a stateful job upgrade can fail. > For example: > - Failure/timeout during savepoint > - Incompatible state > - Corrupted / not-found checkpoint > - Error after restart > We should allow some strategies for the user to declare how to handle the > different error scenarios (such as roll back to earlier state) and what > should be treated as a fatal error. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #165: [FLINK-26140] Support rollback strategies
gyfora commented on PR #165: URL: https://github.com/apache/flink-kubernetes-operator/pull/165#issuecomment-1095479699 Opening this for review/discussion, I will work on adding tests for the core logic. cc @wangyang0918 @tweise @Aitozi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27189) Ability for table api to always add quotes to generated csv
shameet created FLINK-27189: --- Summary: Ability for table api to always add quotes to generated csv Key: FLINK-27189 URL: https://issues.apache.org/jira/browse/FLINK-27189 Project: Flink Issue Type: Improvement Components: API / Python Reporter: shameet I am using the table api in pyflink to generate a csv . What i noticed is that its conditionally adding quotes around the data. What I want is quotes around all the data csv is being created in s3 e.g in output below the data in last column was not quoted "[transaction_idgeorge.bl...@reqres.in|mailto:transaction_idgeorge.bl...@reqres.in]",card_hash,transaction_id "[no3500957594177george.bl...@reqres.in|mailto:no3500957594177george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177 "[no3500957594178george.bl...@reqres.in|mailto:no3500957594178george.bl...@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178 I had posted this question in user community and Dian FU suggested i could create a Jira as this is not supported right now [https://lists.apache.org/thread/y2g7kjf6ylmqtm2w9b28kfcdborvcgtn] sample code to create a csv in s3 def create_source_table(table_name, input_path): return""" CREATE TABLE \{0} ( transaction_id VARCHAR(100), card_hash VARCHAR(100) ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '\{1}' ) """.format( table_name, input_path) def create_sink_table(table_name, bucket_name): return""" CREATE TABLE \{0} ( transaction_id VARCHAR(100), card_hash VARCHAR(100), brand_id VARCHAR(100) ) with ( 'connector'='filesystem', 'path'='\{1}', 'format'='csv' ) """.format( table_name, bucket_name) 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_source_table( input_table_name, input_file ) ){color:#00} {color} table_env.execute_sql( create_sink_table( out_table_name, output_bucket_name ) ){color:#00} {color} table_env.register_function("addme1", addme1) {color:#00} {color}{color:#00} {color}{color:#00} {color} source_table = table_env.from_path(input_table_name) source_table.select(addme1(source_table.transaction_id),source_table.card_hash, source_table.transaction_id.alias('brand_id')).execute_insert(out_table_name).wait() {color:#202020} {color} apache-flink version - 1.13 python 3.8 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27188) Deprecate StreamingFileSink
Daisy Tsang created FLINK-27188: --- Summary: Deprecate StreamingFileSink Key: FLINK-27188 URL: https://issues.apache.org/jira/browse/FLINK-27188 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Affects Versions: 1.12.7 Reporter: Daisy Tsang Assignee: Daisy Tsang Fix For: 1.12.7 The StreamingFileSink has been deprecated in favor of the unified FileSink since Flink 1.12. This changed is reflected in the docs, but not yet in the codebase. https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API https://issues.apache.org/jira/browse/FLINK-19510 https://issues.apache.org/jira/browse/FLINK-20337 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] blake-wilson commented on pull request #19417: [FLINK-20060][Connectors / Kinesis] Add a Collector to KinsesisDeserializationSchema
blake-wilson commented on PR #19417: URL: https://github.com/apache/flink/pull/19417#issuecomment-1095295771 Test failure is from a Kafka integration test and appears to be unrelated ``` 2022-04-11T07:53:52.3560140Z Apr 11 07:53:52 Caused by: java.lang.IllegalStateException: Cannot mark for checkpoint 42, already marked for checkpoint 41 2022-04-11T07:53:52.3560857Z Apr 11 07:53:52 at org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27187) The attemptsPerUpload metric may be lower than it actually is
Feifan Wang created FLINK-27187: --- Summary: The attemptsPerUpload metric may be lower than it actually is Key: FLINK-27187 URL: https://issues.apache.org/jira/browse/FLINK-27187 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Feifan Wang The attemptsPerUpload metric in ChangelogStorageMetricGroup indicate distributions of number of attempts per upload. In the current implementation, each successful attempt try to update attemptsPerUpload with its attemptNumber. But consider this case: # attempt 1 timeout, then schedule attempt 2 # attempt 1 completed before attempt 2 and update attemptsPerUpload with 1 In fact there are two attempts, but attemptsPerUpload updated with 1. So, I think we should add "actionAttemptsCount" to RetryExecutor.RetriableActionAttempt, this field shared across all attempts to execute the same upload action representing the number of upload attempts. And completed attempt should use this field update attemptsPerUpload. How do you think about ? [~ym] , [~roman] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #164: [FLINK-26871] Handle session job spec change
Aitozi commented on PR #164: URL: https://github.com/apache/flink-kubernetes-operator/pull/164#issuecomment-1095272470 @gyfora I introduced the `SavepointObserver` and `JobStatusObserver` to solve the code duplication. Please take a look again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on code in PR #19372: URL: https://github.com/apache/flink/pull/19372#discussion_r847507611 ## flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java: ## @@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) { * task managers. */ @Override -public void start() { -LOG.info("Starting renewal task"); +public void start() throws Exception { +checkState(renewalExecutor == null, "Manager is already started"); + +if (!isRenewalPossible()) { +LOG.info("Renewal is NOT possible, skipping to start renewal task"); +return; +} + +ThreadFactory threadFactory = +new ThreadFactoryBuilder() +.setDaemon(true) +.setNameFormat("Credential Renewal Thread") +.build(); +renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory); Review Comment: In the latest code `ComponentMainThreadExecutor` is null. When I know from where it should come from it will be filled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on code in PR #19372: URL: https://github.com/apache/flink/pull/19372#discussion_r847504606 ## flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java: ## @@ -20,13 +20,27 @@ import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; Review Comment: I basically not agree that mocking is bad in general but this case is special. Junit5 is not supporting powermock so no other possibility than function override. Yeah, there is a possibility to add an interface but I felt it would be an overkill. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] whitecloud6688 commented on pull request #19416: FlinkSQL中创建的表结构信息在SparkSQL中不可见
whitecloud6688 commented on PR #19416: URL: https://github.com/apache/flink/pull/19416#issuecomment-1095242154 感谢回复,可能是我理解错了,FlinkSQL中创建的表,在SparkSQL中是不可读取数据的,如果要读,需要创建外部表关联到FlinkSQL表的路径。所以用desc Flink表是看不到字段的,但是能看到 Table Properties 中的 flink.schema 描述。 @luoyuxia @MartijnVisser -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27155) Reduce multiple reads to the same Changelog file in the same taskmanager during restore
[ https://issues.apache.org/jira/browse/FLINK-27155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520657#comment-17520657 ] Feifan Wang commented on FLINK-27155: - Thanks for your reply [~roman] , I’m glad to implement this proposal, can you assign this ticket to me ? I'll discuss the specifics implementation with you before coding If I was assigned this ticket. > Reduce multiple reads to the same Changelog file in the same taskmanager > during restore > --- > > Key: FLINK-27155 > URL: https://issues.apache.org/jira/browse/FLINK-27155 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Feifan Wang >Priority: Major > > h3. Background > In the current implementation, State changes of different operators in the > same taskmanager may be written to the same changelog file, which effectively > reduces the number of files and requests to DFS. > But on the other hand, the current implementation also reads the same > changelog file multiple times on recovery. More specifically, the number of > times the same changelog file is accessed is related to the number of > ChangeSets contained in it. And since each read needs to skip the preceding > bytes, this network traffic is also wasted. > The result is a lot of unnecessary request to DFS when there are multiple > slots and keyed state in the same taskmanager. > h3. Proposal > We can reduce multiple reads to the same changelog file in the same > taskmanager during restore. > One possible approach is to read the changelog file all at once and cache it > in memory or local file for a period of time when reading the changelog file. > I think this could be a subtask of [v2 FLIP-158: Generalized incremental > checkpoints|https://issues.apache.org/jira/browse/FLINK-25842] . > Hi [~ym] , [~roman] how do you think about ? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24700) Clarify semantics of filter, projection, partition, and metadata pushdown
[ https://issues.apache.org/jira/browse/FLINK-24700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520654#comment-17520654 ] Francesco Guardiani commented on FLINK-24700: - Has this one already been solved [~twalthr] by one of the previous interfaces reworking efforts? > Clarify semantics of filter, projection, partition, and metadata pushdown > - > > Key: FLINK-24700 > URL: https://issues.apache.org/jira/browse/FLINK-24700 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > FLINK-19903 has revealed a couple of shortcomings that occur when > implementing multiple ability interfaces. We should improve the documentation > and better define the semantics. > - Push produced type not only for metadata pushdown but also projection push > down. > - Clarify order of filter + projection > - Clarify order of projection/filter + partition > - Simplify handling of partition columns > ... -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-26905) Re-add FlinkDeploymentList and FlinkSessionJobList classes
[ https://issues.apache.org/jira/browse/FLINK-26905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi closed FLINK-26905. -- Resolution: Fixed Implemented via [bd835f8|https://github.com/apache/flink-kubernetes-operator/commit/bd835f8f70b73004a73321225e5298408ae3e0e9] > Re-add FlinkDeploymentList and FlinkSessionJobList classes > -- > > Key: FLINK-26905 > URL: https://issues.apache.org/jira/browse/FLINK-26905 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Gyula Fora >Assignee: Jaganathan Asokan >Priority: Blocker > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > Seems like removing these was a bad idea as it breaks the fabric8 java client > when using the POJO classes -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-kubernetes-operator] asfgit closed pull request #129: [FLINK-26905] Re-add FlinkDeploymentList and FlinkSessionJobList classes
asfgit closed pull request #129: [FLINK-26905] Re-add FlinkDeploymentList and FlinkSessionJobList classes URL: https://github.com/apache/flink-kubernetes-operator/pull/129 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests
infoverload commented on code in PR #19414: URL: https://github.com/apache/flink/pull/19414#discussion_r847454084 ## flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java: ## @@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception { private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception { +Duration maxOutOfOrderness = extractTimestamp(pt); KeyedStream source = env.addSource(createEventSource(pt)) .name("EventSource") .uid("EventSource") - .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) +.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness)) Review Comment: I tried various things including `WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).createTimestampAssigner(createTimestampExtractor(pt)))` and `WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withTimestampAssigner(createTimestampExtractor(pt)))` but the types never seem to match. Not sure how to reconcile the fact that it is looking for a `TimestampAssignerSupplier` but getting a `BoundedOutOfOrdernessTimestampExtractor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19426: [FLINK-25928] Throw validation error early for CAST
slinkydeveloper commented on code in PR #19426: URL: https://github.com/apache/flink/pull/19426#discussion_r847453122 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java: ## @@ -589,6 +591,37 @@ protected Boolean defaultMethod(LogicalType targetType) { } } +/** + * Check if the source/target pair is not allowed and throw a {@link ValidationException} with a + * useful error message. + */ Review Comment: This javadoc is not true, as if the source target pair is not allowed in all other cases except numeric to timestamp, this function won't fail. In general I don't like much the fact that we have both `validate` and `supportsExplicitCast`, is there a better way to solve this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19349: [FLINK-27043][table] Removing old csv format references
slinkydeveloper commented on code in PR #19349: URL: https://github.com/apache/flink/pull/19349#discussion_r847448720 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/WithTableEnvironment.java: ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Using this annotation you can inject in the test method: + * + * + * {@link TableEnvironment} + * {@link StreamExecutionEnvironment} (Java or Scala) + * {@link StreamTableEnvironment} (Java or Scala) + * + * + * The underlying parameter injector will infer automatically the type to use from the signature Review Comment: Not sure I can add it in scala, as the syntax highlight won't work... Anyway, I included it in Java, is it ok? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] zentol commented on pull request #526: Announcement blogpost for the 1.15 release
zentol commented on PR #526: URL: https://github.com/apache/flink-web/pull/526#issuecomment-1095162237 I would really like to see the experimental OpenAPI specification being part of the announcement. That's not something that existing users would stumble upon in the docs I'm afraid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on a diff in pull request #19349: [FLINK-27043][table] Removing old csv format references
twalthr commented on code in PR #19349: URL: https://github.com/apache/flink/pull/19349#discussion_r847357250 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/WithTableEnvironment.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Using this annotation you can inject in the test method: + * + * + * {@link TableEnvironment} + * {@link StreamExecutionEnvironment} (Java or Scala) + * {@link StreamTableEnvironment} (Java or Scala) + * + * + * When using with {@link ParameterizedTest}, make sure the table environment parameter is the + * last one in the signature. + */ +@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER}) +@Retention(RetentionPolicy.RUNTIME) +@ExtendWith(TableJUnitExtensions.TableEnvironmentParameterResolver.class) +public @interface WithTableEnvironment { Review Comment: Shall we move this to `table-test-utils`? ## flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out: ## @@ -42,4 +42,4 @@ Calc(select=[a, b, c], where=[(a > 10)]) "side" : "second" } ] } ] -} \ No newline at end of file +} Review Comment: remove these file changes ## flink-end-to-end-tests/test-scripts/test_batch_sql.sh: ## @@ -72,11 +72,14 @@ set_config_key "taskmanager.numberOfTaskSlots" "1" start_cluster # The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots -$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "file://${OUTPUT_FILE_PATH}" -sqlStatement \ +$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "${OUTPUT_FILE_PATH}" -sqlStatement \ "INSERT INTO sinkTable $(sqlJobQuery)" +# Concat result +cat ${OUTPUT_FILE_PATH}/* > ${OUTPUT_FILE_PATH}/result.csv Review Comment: did you check that the appending order is deterministic? ## flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java: ## @@ -62,13 +63,17 @@ public static void main(String[] args) throws Exception { .registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0)); ((TableEnvironmentInternal) tEnv) .registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5)); -((TableEnvironmentInternal) tEnv) -.registerTableSinkInternal( -"sinkTable", -new CsvTableSink(outputPath) -.configure( -new String[] {"f0", "f1"}, -new TypeInformation[] {Types.INT, Types.SQL_TIMESTAMP})); +tEnv.createTemporaryTable( +"sinkTable", +TableDescriptor.forConnector("filesystem") +.schema( +Schema.newBuilder() +.column("f0", DataTypes.INT()) +.column("f1", DataTypes.TIMESTAMP(3)) +.build()) +.option(FileSystemConnectorOptions.PATH, outputPath) +.format(FormatDescriptor.forFormat("csv").build()) Review Comment: use `format("csv")` ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala: ## @@ -71,95 +73,82 @@ object TestTableSourceSinks { | score DOUBLE, | last STRING |) WITH ( - | 'connector.type' =
[GitHub] [flink] flinkbot commented on pull request #19426: [FLINK-25928] Throw validation error early for CAST
flinkbot commented on PR #19426: URL: https://github.com/apache/flink/pull/19426#issuecomment-1095157121 ## CI report: * 1dd50a02ec38485aa442a46ca439df7861565ed0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] matriv commented on pull request #19426: [FLINK-25928] Throw validation error early for CAST
matriv commented on PR #19426: URL: https://github.com/apache/flink/pull/19426#issuecomment-1095153446 Added the validation only in the `supportsExplicitCasting` as there are cases where we do implicit casting between TIMESTAMP and numeric, i.e. In `TemporalTypesTest`: ``` testAllApis( 'f2 + 10.days + 4.millis, "f2 + INTERVAL '10 00:00:00.004' DAY TO SECOND", "1990-10-24 10:20:45.127") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-25928) Refactor timestamp<->number validation messages
[ https://issues.apache.org/jira/browse/FLINK-25928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-25928: --- Labels: pull-request-available (was: ) > Refactor timestamp<->number validation messages > --- > > Key: FLINK-25928 > URL: https://issues.apache.org/jira/browse/FLINK-25928 > Project: Flink > Issue Type: Sub-task >Reporter: Marios Trivyzas >Assignee: Marios Trivyzas >Priority: Major > Labels: pull-request-available > > Move timestamp -> number and number -> timestamp validation messages which > suggest the usage of dedicated methods to a validation method in > `{*}LogicalTypeCasts{*}`, and call this validation early, before trying to > resolve the `{*}CastRule`{*}s. > https://github.com/apache/flink/pull/18582#discussion_r797372485 > https://github.com/apache/flink/commit/29d8d03b1be818a64834e5ba670a83d8857111ab -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] matriv opened a new pull request, #19426: [FLINK-25928] Throw validation error early for CAST
matriv opened a new pull request, #19426: URL: https://github.com/apache/flink/pull/19426 ## What is the purpose of the change Throw early a validation error with workaround suggestion for casting between numeric types and `TIMESTAMP`/`TIMESTAMP_LTZ` before reaching the `CastRuleProvider`. This way we don't need to implement such errors in Cast Rules and we can throw the error message early before reaching the code generator. ## Brief change log - Implement validation for from/to `CAST` pairs in `LogicalTypeCasts` - Remove the Numeric <-> Timestamp/Timestamp_ltz rules which were added only to throw this message ## Verifying this change This change is already covered by existing tests, such as `CastFunctionITCase`, `TemporalTypesTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on code in PR #19372: URL: https://github.com/apache/flink/pull/19372#discussion_r847414464 ## flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java: ## @@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) { * task managers. */ @Override -public void start() { -LOG.info("Starting renewal task"); +public void start() throws Exception { +checkState(renewalExecutor == null, "Manager is already started"); + +if (!isRenewalPossible()) { +LOG.info("Renewal is NOT possible, skipping to start renewal task"); +return; +} + +ThreadFactory threadFactory = Review Comment: Since we plan to use existing executors this code will be deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on code in PR #19372: URL: https://github.com/apache/flink/pull/19372#discussion_r847413385 ## flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java: ## @@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) { * task managers. */ @Override -public void start() { -LOG.info("Starting renewal task"); +public void start() throws Exception { +checkState(renewalExecutor == null, "Manager is already started"); + +if (!isRenewalPossible()) { +LOG.info("Renewal is NOT possible, skipping to start renewal task"); +return; +} + +ThreadFactory threadFactory = +new ThreadFactoryBuilder() +.setDaemon(true) +.setNameFormat("Credential Renewal Thread") +.build(); +renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory); Review Comment: I'm fine w/ using the already existing executors but I don't see any `ComponentMainThreadExecutor` in `ResourceManager`. Can you point where can I find it to pass to DTM? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on pull request #19403: [FLINK-27120][docs-zh] Translate "Gradle" tab of development > conf > overview
zoltar9264 commented on PR #19403: URL: https://github.com/apache/flink/pull/19403#issuecomment-1095130896 Thanks @RocMarshal , Is there any thing need I to do ? Such as squash commits ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847395187 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -306,77 +329,75 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} -## Enabling Changelog + -{{< hint warning >}} This feature is in experimental status. {{< /hint >}} +## 开启 Changelog -{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} +{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}} -### Introduction +{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}} -Changelog is a feature that aims to decrease checkpointing time and, therefore, end-to-end latency in exactly-once mode. + -Most commonly, checkpoint duration is affected by: +### 介绍 -1. Barrier travel time and alignment, addressed by - [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) - and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) -2. Snapshot creation time (so-called synchronous phase), addressed by asynchronous snapshots (mentioned [above]({{< - ref "#the-embeddedrocksdbstatebackend">}})) -4. Snapshot upload time (asynchronous phase) +Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以减少 exactly-once 模式下的端到端延迟。 -Upload time can be decreased by [incremental checkpoints]({{< ref "#incremental-checkpoints" >}}). -However, most incremental state backends perform some form of compaction periodically, which results in re-uploading the -old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of -data tends to be very high in every checkpoint. +一般情况下 checkpoint 持续时间受如下因素影响: -With Changelog enabled, Flink uploads state changes continuously and forms a changelog. On checkpoint, only the relevant -part of this changelog needs to be uploaded. The configured state backend is snapshotted in the -background periodically. Upon successful upload, the changelog is truncated. +1. Barrier 到达和对齐时间,可以通过 [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) 解决。 -As a result, asynchronous phase duration is reduced, as well as synchronous phase - because no data needs to be flushed -to disk. In particular, long-tail latency is improved. +2. 快照制作时间(所谓同步阶段), 可以通过异步快照解决(如[上文]({{< + ref "#the-embeddedrocksdbstatebackend">}})所述)。 -However, resource usage is higher: +3. 快照上传时间(异步阶段)。 -- more files are created on DFS -- more files can be left undeleted DFS (this will be addressed in the future versions in FLINK-25511 and FLINK-25512) -- more IO bandwidth is used to upload state changes -- more CPU used to serialize state changes -- more memory used by Task Managers to buffer state changes +可以用过[增量 checkpoints]({{< ref "#incremental-checkpoints" >}}) 来减少上传时间。但是,大多数增量的状态后端会定期执行某种形式的合并,这会导致除了新的变更之外还要重新上传旧状态。在大规模部署中,每个 checkpoint 中至少有一个 task 上传大量数据的可能性往往非常高。 Review Comment: Yes, I will fix it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847393806 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -306,77 +329,75 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} -## Enabling Changelog + -{{< hint warning >}} This feature is in experimental status. {{< /hint >}} +## 开启 Changelog -{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} +{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}} -### Introduction +{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}} Review Comment: Hi @masteryhx , I also used "您" at first, until I saw [Flink Translation Specifications](https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications) . Do you think I should change this to "您" or everywhere else to "你" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847386103 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -38,6 +38,8 @@ under the License. 在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。 + + # 可用的 State Backends Review Comment: Hi @masteryhx , available-state-backends is in English version, you can check it with https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#available-state-backends -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on code in PR #19372: URL: https://github.com/apache/flink/pull/19372#discussion_r847376435 ## flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java: ## @@ -79,4 +93,48 @@ public void testAllProvidersLoaded() { assertTrue(ExceptionThrowingDelegationTokenProvider.constructed); assertFalse(delegationTokenManager.isProviderLoaded("throw")); } + +@Test +public void isRenewalPossibleMustGiveBackFalseByDefault() throws IOException { +UserGroupInformation ugi = PowerMockito.mock(UserGroupInformation.class); +PowerMockito.mockStatic(UserGroupInformation.class); +when(UserGroupInformation.getCurrentUser()).thenReturn(ugi); + +ExceptionThrowingDelegationTokenProvider.enabled = false; +Configuration configuration = new Configuration(); +KerberosDelegationTokenManager delegationTokenManager = +new KerberosDelegationTokenManager(configuration); + +assertFalse(delegationTokenManager.isRenewalPossible()); +} + +@Test +public void isRenewalPossibleMustGiveBackTrueWhenKeytab() throws IOException { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #19252: [FLINK-25867][docs] translate ChangelogBackend documentation to chinese
zoltar9264 commented on code in PR #19252: URL: https://github.com/apache/flink/pull/19252#discussion_r847374465 ## docs/content.zh/docs/ops/state/state_backends.md: ## @@ -306,77 +329,75 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} -## Enabling Changelog + -{{< hint warning >}} This feature is in experimental status. {{< /hint >}} +## 开启 Changelog -{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} +{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}} -### Introduction +{{< hint warning >}} 开启 Changelog 可能会给你的应用带来性能损失。(见下文) {{< /hint >}} -Changelog is a feature that aims to decrease checkpointing time and, therefore, end-to-end latency in exactly-once mode. + -Most commonly, checkpoint duration is affected by: +### 介绍 -1. Barrier travel time and alignment, addressed by - [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) - and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) -2. Snapshot creation time (so-called synchronous phase), addressed by asynchronous snapshots (mentioned [above]({{< - ref "#the-embeddedrocksdbstatebackend">}})) -4. Snapshot upload time (asynchronous phase) +Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以减少 exactly-once 模式下的端到端延迟。 -Upload time can be decreased by [incremental checkpoints]({{< ref "#incremental-checkpoints" >}}). -However, most incremental state backends perform some form of compaction periodically, which results in re-uploading the -old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of -data tends to be very high in every checkpoint. +一般情况下 checkpoint 持续时间受如下因素影响: -With Changelog enabled, Flink uploads state changes continuously and forms a changelog. On checkpoint, only the relevant -part of this changelog needs to be uploaded. The configured state backend is snapshotted in the -background periodically. Upon successful upload, the changelog is truncated. +1. Barrier 到达和对齐时间,可以通过 [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) 解决。 Review Comment: Sorry , it's my oversight, I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests
slinkydeveloper commented on code in PR #19414: URL: https://github.com/apache/flink/pull/19414#discussion_r847359774 ## flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java: ## @@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception { private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception { +Duration maxOutOfOrderness = extractTimestamp(pt); KeyedStream source = env.addSource(createEventSource(pt)) .name("EventSource") .uid("EventSource") - .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) +.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness)) Review Comment: I mean the timestamp extractor from the record -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests
infoverload commented on code in PR #19414: URL: https://github.com/apache/flink/pull/19414#discussion_r847355828 ## flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java: ## @@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception { private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception { +Duration maxOutOfOrderness = extractTimestamp(pt); KeyedStream source = env.addSource(createEventSource(pt)) .name("EventSource") .uid("EventSource") - .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) +.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness)) Review Comment: Are you sure? Because the `Duration maxOutOfOrderness = extractTimestamp(pt);` above already has the timestamp ## flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java: ## @@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception { private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception { +Duration maxOutOfOrderness = extractTimestamp(pt); KeyedStream source = env.addSource(createEventSource(pt)) .name("EventSource") .uid("EventSource") - .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) +.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness)) Review Comment: Are you sure? Because the `Duration maxOutOfOrderness = extractTimestamp(pt)` above already has the timestamp -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests
infoverload commented on code in PR #19414: URL: https://github.com/apache/flink/pull/19414#discussion_r847354659 ## flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java: ## @@ -65,7 +67,11 @@ public static void main(String[] args) throws Exception { Files.size(inputFile), inputDir.toAbsolutePath().toString(), containedFile.getFileName().toString())) -.writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE); +.sinkTo( +FileSink.forRowFormat( +new org.apache.flink.core.fs.Path(outputPath), Review Comment: Turns out I am using both! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on PR #19372: URL: https://github.com/apache/flink/pull/19372#issuecomment-1095077528 > Should the obtainDelegationTokens(Credentials credentials) be implemented? Maybe it could be a shared code path for the renewal (the one time renewal that is scheduled)? Not sure what you mean here. TGT renewal and token obtain are totally different from many factors: * They obtain different things * They does it with different frequency I would like ask you to elaborate on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org