[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-09-02 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-30935:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> Current kafka many implemented serializers do not deal with version check 
> while other implementations of SimpleVersionedSerializer supports it.
> we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint.
>  
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32044) Improve catalog name check to keep consistent about human-readable exception log in FunctionCatalog

2023-09-02 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-32044:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Improve catalog name check to keep consistent about human-readable exception 
> log in FunctionCatalog 
> 
>
> Key: FLINK-32044
> URL: https://issues.apache.org/jira/browse/FLINK-32044
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.17.0
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> {code:java}
> Catalog catalog = catalogManager.getCatalog(catalogName).get(); {code}
>  
> We can do an improvement to check optional#get and throw more friendly log to 
> users like other list operations.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31377) BinaryArrayData getArray/getMap should Handle null correctly AssertionError: valueArraySize (-6) should >= 0

2023-09-02 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-31377:
---
  Labels: auto-deprioritized-major pull-request-available  (was: 
pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> BinaryArrayData getArray/getMap should Handle null correctly AssertionError: 
> valueArraySize (-6) should >= 0 
> -
>
> Key: FLINK-31377
> URL: https://issues.apache.org/jira/browse/FLINK-31377
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0
>Reporter: Jacky Lau
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> you can reproduce this error below. and reason is in ARRAY_CONTAINS
> {code:java}
> if the needle is a Map NOT NULL,and the array has null element.
> this bellowing will cause getElementOrNull(ArrayData array, int pos) only can 
> handle not null. so it throw exception
> /*elementGetter = 
> ArrayData.createElementGetter(needleDataType.getLogicalType());*/,
> {code}
>  
> {code:java}
> // code placeholder
> Stream getTestSetSpecs() {
> return Stream.of(
> TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_CONTAINS)
> .onFieldsWithData(
> new Map[] {
> null,
> CollectionUtil.map(entry(1, "a"), entry(2, 
> "b")),
> CollectionUtil.map(entry(3, "c"), entry(4, 
> "d")),
> },
> null)
> .andDataTypes(
> DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
> DataTypes.STRING())),
> DataTypes.STRING())
> .testResult(
> $("f0").arrayContains(
> CollectionUtil.map(entry(3, "c"), 
> entry(4, "d"))),
> "ARRAY_CONTAINS(f0, MAP[3, 'c', 4, 'd'])",
> true,
> DataTypes.BOOLEAN()));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32525) Update commons-beanutils to 1.9.4

2023-09-02 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-32525:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Update commons-beanutils to 1.9.4
> -
>
> Key: FLINK-32525
> URL: https://issues.apache.org/jira/browse/FLINK-32525
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Deployment / YARN
>Reporter: Martijn Visser
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> YARN still tests with commons-beanutils 1.8.3 with a remark that beanutil 
> 1.9+ doesn't work with Hadoop, but Hadoop 2.10.2 (which is our minimum 
> supported version) uses beanutils 1.9.4 itself, per 
> https://github.com/apache/hadoop/blob/rel/release-2.10.2/hadoop-project/pom.xml#L861-L863



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28881) PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream test failure

2023-09-02 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28881:
---
Labels: stale-major test-stability  (was: test-stability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream
>  test failure
> ---
>
> Key: FLINK-28881
> URL: https://issues.apache.org/jira/browse/FLINK-28881
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Google Cloud PubSub
>Affects Versions: 1.16.0, gcp-pubsub-3.0.0
>Reporter: Leonard Xu
>Priority: Major
>  Labels: stale-major, test-stability
>
> {code:java}
> 2022-08-09T03:14:22.0113691Z Aug 09 03:14:22 [ERROR] 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream
>   Time elapsed: 1.867 s  <<< FAILURE!
> 2022-08-09T03:14:22.0114504Z Aug 09 03:14:22 java.lang.AssertionError: 
> 2022-08-09T03:14:22.0114903Z Aug 09 03:14:22 
> 2022-08-09T03:14:22.0115263Z Aug 09 03:14:22 Expected: <[1, 2, 3]>
> 2022-08-09T03:14:22.0115679Z Aug 09 03:14:22  but: was <[1, 2]>
> 2022-08-09T03:14:22.0116232Z Aug 09 03:14:22  at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> 2022-08-09T03:14:22.0116871Z Aug 09 03:14:22  at 
> org.junit.Assert.assertThat(Assert.java:964)
> 2022-08-09T03:14:22.0117580Z Aug 09 03:14:22  at 
> org.junit.Assert.assertThat(Assert.java:930)
> 2022-08-09T03:14:22.0118460Z Aug 09 03:14:22  at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream(PubSubConsumingTest.java:119)
> {code}
> CI link: 
> https://dev.azure.com/leonardBang/Azure_CI/_build/results?buildId=713=logs=3796201e-ea88-5776-0ea8-9ccca648a70c=8ca54b76-085e-5cf1-8060-2c500a258258



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26115) Multiple Kafka connector tests failed due to The topic metadata failed to propagate to Kafka broker

2023-09-02 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-26115:
---
Labels: auto-deprioritized-critical auto-deprioritized-major stale-minor 
test-stability  (was: auto-deprioritized-critical auto-deprioritized-major 
test-stability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Multiple Kafka connector tests failed due to The topic metadata failed to 
> propagate to Kafka broker
> ---
>
> Key: FLINK-26115
> URL: https://issues.apache.org/jira/browse/FLINK-26115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.5, 1.14.3, 1.15.0
>Reporter: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor, test-stability
>
> This issues tracks all the related issues with "The topic metadata failed to 
> propagate to Kafka broker"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Samrat002 commented on pull request #21458: [FLINK-28513] Fix Flink Table API CSV streaming sink throws SerializedThrowable exception

2023-09-02 Thread via GitHub


Samrat002 commented on PR #21458:
URL: https://github.com/apache/flink/pull/21458#issuecomment-1703902505

   I have taken an example where a datagen table is created with 2 fields 
`fname` and `lname`. Also created another table which is of type filesystem and 
points to a specfic s3 path and format used is `csv`. 
   
   ```
-- create a genertor table 
   CREATE TABLE generator (
   fname STRING,
   lname STRING
   ) WITH (
 'connector' = 'datagen'
 
   );
   
   -- create a sample dynamic table with connector filesystem. It supports csv 
as format. 
   CREATE TABLE `name_table` (
 `fname` STRING,
 `lname` STRING
   ) with (
   'connector'='filesystem',
   'format' = 'csv',
   'path' = 's3://dbsamrat-flink-dev/data/default/name_table'
   );
   
   -- run a job to insert data in table (s3)
   insert into name_table select * from generator;
   ```
   
   Here is the below flink-conf file used for the cluster (also these configs 
are picked in job )
   

   
   
   Attaching the jobmanager log for insertion of data in csvformated s3 path 
which uses CsvBulkWriter and maintains 2 phase commit. 
   
[jobmanager.log](https://github.com/apache/flink/files/12504473/insert_jobmanager.log)
   
   
   It can be noted that 2 phase commit is happening at checkpoint trigger. 
   
   Additional job executed seperately to read data from name_table. 
   
[count_jobmanager.log](https://github.com/apache/flink/files/12504475/count_jobmanager.log)
   
   
   @dannycranmer @hlteoh37 please review if this satisfy the guarentee for 
exactly once . 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31685) Checkpoint job folder not deleted after job is cancelled

2023-09-02 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761487#comment-17761487
 ] 

Zakelly Lan commented on FLINK-31685:
-

[~masteryhx]  I think there are two issue within this problem:
 # User may not need the job-id directory (to simplify the cp dir layout 
especially in CLAIM node), I will create another ticket to address this.
 # Deleting the job-id directory if all the checkpoint files are deleted. 
Different from [~Wencong Liu]'s opinion, I think it is the 
```CompletedCheckpointStore```'s responsibility to delete the job-id directory, 
since it has the global view of whether it is needed by any other checkpoint.

WDYT?

> Checkpoint job folder not deleted after job is cancelled
> 
>
> Key: FLINK-31685
> URL: https://issues.apache.org/jira/browse/FLINK-31685
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.1
>Reporter: Sergio Sainz
>Priority: Major
>
> When flink job is being checkpointed, and after the job is cancelled, the 
> checkpoint is indeed deleted (as per 
> {{{}execution.checkpointing.externalized-checkpoint-retention: 
> DELETE_ON_CANCELLATION{}}}), but the job-id folder still remains:
>  
> [sergio@flink-cluster-54f7fc7c6-k6km8 JobCheckpoints]$ ls
> 01eff17aa2910484b5aeb644bc531172  3a59309ef018541fc0c20856d0d89855  
> 78ff2344dd7ef89f9fbcc9789fc0cd79  a6fd7cec89c0af78c3353d4a46a7d273  
> dbc957868c08ebeb100d708bbd057593
> 04ff0abb9e860fc85f0e39d722367c3c  3e09166341615b1b4786efd6745a05d6  
> 79efc000aa29522f0a9598661f485f67  a8c42bfe158abd78ebcb4adb135de61f  
> dc8e04b02c9d8a1bc04b21d2c8f21f74
> 05f48019475de40230900230c63cfe89  3f9fb467c9af91ef41d527fe92f9b590  
> 7a6ad7407d7120eda635d71cd843916a  a8db748c1d329407405387ac82040be4  
> dfb2df1c25056e920d41c94b659dcdab
> 09d30bc0ff786994a6a3bb06abd3  455525b76a1c6826b6eaebd5649c5b6b  
> 7b1458424496baaf3d020e9fece525a4  aa2ef9587b2e9c123744e8940a66a287
> All folders in the above list, like {{01eff17aa2910484b5aeb644bc531172}}  , 
> are empty ~
>  
> *Expected behaviour:*
> The job folder id should also be deleted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #253: [FLINK-33003] Support isolation forest algorithm in Flink ML

2023-09-02 Thread via GitHub


lindong28 commented on code in PR #253:
URL: https://github.com/apache/flink-ml/pull/253#discussion_r1313857237


##
flink-ml-lib/src/main/java/org/apache/flink/ml/anomalydetection/isolationforest/IsolationForestParams.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.anomalydetection.isolationforest;
+
+import org.apache.flink.ml.common.param.HasDistanceMeasure;
+import org.apache.flink.ml.common.param.HasFeaturesCol;
+import org.apache.flink.ml.common.param.HasPredictionCol;
+import org.apache.flink.ml.common.param.HasWindows;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+
+/**
+ * Params of {@link IsolationForestModel}.
+ *
+ * @param  The class of this instance.
+ */
+public interface IsolationForestParams
+extends HasDistanceMeasure, HasFeaturesCol, HasPredictionCol, 
HasWindows {
+Param TREES_NUMBER =
+new IntParam("treesNumber", "The max number of trees to create.", 
2);
+
+Param ITERS =

Review Comment:
   Would it be simpler to re-use HasMaxIter?



##
flink-ml-lib/src/main/java/org/apache/flink/ml/anomalydetection/isolationforest/IsolationForest.java:
##
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.anomalydetection.isolationforest;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBody;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationConfig;
+import org.apache.flink.iteration.IterationListener;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.ReplayableDataStreamList;
+import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.iteration.ForwardInputsOfLastRound;
+import org.apache.flink.ml.common.iteration.TerminateOnMaxIter;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import 

[GitHub] [flink] flinkbot commented on pull request #23348: [FLINK-5279] Print state name and type in error message when trying to access keyed state in non-keyed operator

2023-09-02 Thread via GitHub


flinkbot commented on PR #23348:
URL: https://github.com/apache/flink/pull/23348#issuecomment-1703860991

   
   ## CI report:
   
   * a9ed979e1ac92962d30a9221bcbca4609b7ae729 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Zakelly opened a new pull request, #23348: [FLINK-5279] Print state name and type in error message when trying to access keyed state in non-keyed operator

2023-09-02 Thread via GitHub


Zakelly opened a new pull request, #23348:
URL: https://github.com/apache/flink/pull/23348

   ## What is the purpose of the change
   
   Improve the error message when trying to access keyed state in non-keyed 
operator, printing the state name and state type. It is more convenient for 
user to identify the problem.
   
   ## Brief change log
   
   Modify the error message and insert the state name and state type.
   
   
   ## Verifying this change
   
   This change is a trivial rework, and the new error message can be validated 
by ```CoBroadcastWithNonKeyedOperatorTest```
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-5279) Improve error message when trying to access keyed state in non-keyed operator

2023-09-02 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761455#comment-17761455
 ] 

Zakelly Lan commented on FLINK-5279:


I would like to take this trivial work.

> Improve error message when trying to access keyed state in non-keyed operator
> -
>
> Key: FLINK-5279
> URL: https://issues.apache.org/jira/browse/FLINK-5279
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.1.3
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> When trying to access keyed state in a non-keyed operator, the error message 
> is not very helpful. You get a trace like this:
> {code}
> java.lang.RuntimeException: Error while getting state
> ...
> Caused by: java.lang.RuntimeException: State key serializer has not been 
> configured in the config. This operation cannot use partitioned state.
> {code}
> It will be helpful to users if this is more explicit to users, stating that 
> the API can only be used on keyed streams, etc.
> If this applies to the current master as well, we should fix it there, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22059) add a new option is rocksdb statebackend to enable job threads setting

2023-09-02 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761454#comment-17761454
 ] 

Zakelly Lan commented on FLINK-22059:
-

Hi [~zhoujira86] , are you still working on this issue?

> add a new option is rocksdb statebackend to enable job threads setting
> --
>
> Key: FLINK-22059
> URL: https://issues.apache.org/jira/browse/FLINK-22059
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.12.2
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, stale-assigned
> Fix For: 1.18.0
>
>
> As discussed in FLINK-21688 , now we are using the setIncreaseParallelism 
> function to set the number of rocksdb's working threads. 
>  
> can we enable another setting key to set the rocksdb's max backgroud jobs 
> which will set a large flush thread number.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-5925) Clean up extracted RocksDB JNI library

2023-09-02 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761449#comment-17761449
 ] 

Zakelly Lan edited comment on FLINK-5925 at 9/2/23 10:05 AM:
-

IIUC, this has been addressed by FLIP-198.


was (Author: zakelly):
IIUC, this has been addressed by FLIP-198. So I close this.

> Clean up extracted RocksDB JNI library
> --
>
> Key: FLINK-5925
> URL: https://issues.apache.org/jira/browse/FLINK-5925
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The {{RocksDBStateBackend}} extracts the RocksDB jni library from the RocksDB 
> dependency in a temp directory (see 
> {{RocksDBStateBackend#ensureRocksDBIsLoaded}}). This file is, however, never 
> removed. 
> In general, I think we should add a cleanup mechanism which cleans all 
> {{Task}} specific files after the {{Task}} has completed. The same applies to 
> the {{TaskManager}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-5925) Clean up extracted RocksDB JNI library

2023-09-02 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761449#comment-17761449
 ] 

Zakelly Lan commented on FLINK-5925:


IIUC, this has been addressed by FLIP-198. So I close this.

> Clean up extracted RocksDB JNI library
> --
>
> Key: FLINK-5925
> URL: https://issues.apache.org/jira/browse/FLINK-5925
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The {{RocksDBStateBackend}} extracts the RocksDB jni library from the RocksDB 
> dependency in a temp directory (see 
> {{RocksDBStateBackend#ensureRocksDBIsLoaded}}). This file is, however, never 
> removed. 
> In general, I think we should add a cleanup mechanism which cleans all 
> {{Task}} specific files after the {{Task}} has completed. The same applies to 
> the {{TaskManager}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30812) YARN with S3 resource storage fails for Hadoop 3.3.2

2023-09-02 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi closed FLINK-30812.
-

> YARN with S3 resource storage fails for Hadoop 3.3.2
> 
>
> Key: FLINK-30812
> URL: https://issues.apache.org/jira/browse/FLINK-30812
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.16.0, 1.17.0, 1.18.0
>Reporter: Mate Czagany
>Assignee: Mate Czagany
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and expects 
> the local source Hadoop Path object to have a scheme specified which the 
> YarnClusterDescriptor uploading the local files won't have.
> When uploading files to S3 CopyFromLocalOperation#getFinalPath compares the 
> passed source Hadoop Path with the file it found(which will have file:// 
> scheme) using URI.relativize but it will fail because of the scheme 
> difference and throw PathIOException as can be seen in this exception:
>  
> {code:java}
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn Application Cluster
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]        ..
> Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path 
> for 
> URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
>  Input/output error
>   at 
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>  ~[hadoop-common-3.3.3.jar!/:?]
>   at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
>  ~[hadoop-common-3.3.3.jar!/:?]
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   ... 35 more {code}
>  
> The possibly easiest solution would be to somehow add the file:// scheme in 
> YarnApplicationFileUploader#copyToRemoteApplicationDir
> The other solution would be to change all calls uploading local files to use 
> "new Path(file.toURI())" instead of "new Path(file.getAbsolutePath())" but it 
> might not be as future-proof as the other solution
> Email thread: 
> [https://lists.apache.org/thread/oo5rlyo3jr7kds2y6wwnfo1yhnk0fx4c]
>  
> If a committer can assign this ticket to me I can start working on this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30812) YARN with S3 resource storage fails for Hadoop 3.3.2

2023-09-02 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi resolved FLINK-30812.
---
Fix Version/s: 1.19.0
   Resolution: Fixed

9507dd6 on master

> YARN with S3 resource storage fails for Hadoop 3.3.2
> 
>
> Key: FLINK-30812
> URL: https://issues.apache.org/jira/browse/FLINK-30812
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.16.0, 1.17.0, 1.18.0
>Reporter: Mate Czagany
>Assignee: Mate Czagany
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and expects 
> the local source Hadoop Path object to have a scheme specified which the 
> YarnClusterDescriptor uploading the local files won't have.
> When uploading files to S3 CopyFromLocalOperation#getFinalPath compares the 
> passed source Hadoop Path with the file it found(which will have file:// 
> scheme) using URI.relativize but it will fail because of the scheme 
> difference and throw PathIOException as can be seen in this exception:
>  
> {code:java}
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn Application Cluster
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]        ..
> Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path 
> for 
> URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
>  Input/output error
>   at 
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>  ~[hadoop-common-3.3.3.jar!/:?]
>   at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
>  ~[hadoop-common-3.3.3.jar!/:?]
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
>  ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471)
>  ~[flink-yarn-1.16.0.jar!/:1.16.0]
>   ... 35 more {code}
>  
> The possibly easiest solution would be to somehow add the file:// scheme in 
> YarnApplicationFileUploader#copyToRemoteApplicationDir
> The other solution would be to change all calls uploading local files to use 
> "new Path(file.toURI())" instead of "new Path(file.getAbsolutePath())" but it 
> might not be as future-proof as the other solution
> Email thread: 
> [https://lists.apache.org/thread/oo5rlyo3jr7kds2y6wwnfo1yhnk0fx4c]
>  
> If a committer can assign this ticket to me I can start working on this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] gaborgsomogyi merged pull request #21788: [FLINK-30812][yarn] Fix uploading local files when using YARN with S3

2023-09-02 Thread via GitHub


gaborgsomogyi merged PR #21788:
URL: https://github.com/apache/flink/pull/21788


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30530) Flink configuration from user-provided ConfigMap

2023-09-02 Thread Arseniy Tashoyan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761442#comment-17761442
 ] 

Arseniy Tashoyan commented on FLINK-30530:
--

I would say that default config is a different thing. So this feature would be 
a good addition.

> Flink configuration from user-provided ConfigMap
> 
>
> Key: FLINK-30530
> URL: https://issues.apache.org/jira/browse/FLINK-30530
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
> Environment: Flink 1.15.2
> Flink Kubernetes operator 1.2.0
>Reporter: Arseniy Tashoyan
>Priority: Major
>
> Currently the Flink configuration can be specified in the YAML descriptor of 
> FlinkDeployment via the _flinkConfiguration_ setting:
> {code:yaml}
> flinkConfiguration: 
>   taskmanager.numberOfTaskSlots: "2"
>   ...
> {code}
> Same for the logging configuration:
> {code:yaml}
> logConfiguration: 
>     "log4j-console.properties": |
>       rootLogger.level = DEBUG
>       ...{code}
> This makes the YAML descriptor overloaded and huge. In addition, Flink and 
> logging configuration may differ for different applications, while the 
> Kubernetes settings maybe same for all applications. Therefore it makes sense 
> to extract Flink and logging configurations from the YAML descriptor.
> This can be done via a user-provided ConfigMap:
> {code:yaml}
> flinkConfigMap: basic-example-flink-config
> {code}
> In this example we have a Flink application {_}basic-example{_}. The 
> _basic-example-flink-config_ ConfigMap contains all config files used by 
> Flink: flink-conf.yaml, log4j-console.properties, possibly other files. The 
> content of this ConfigMap gets mounted as a volume to {_}/opt/flink/conf{_}.
> Therefore we can have different Flink settings for different applications and 
> the same YAML descriptor for all of them (only the value for _flinkConfigMap_ 
> differs).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)