[jira] [Created] (FLINK-28700) Table store sink fails to commit for Flink 1.14 batch job
Caizhi Weng created FLINK-28700: --- Summary: Table store sink fails to commit for Flink 1.14 batch job Key: FLINK-28700 URL: https://issues.apache.org/jira/browse/FLINK-28700 Project: Flink Issue Type: Bug Components: Table Store Affects Versions: table-store-0.2.0, table-store-0.3.0 Reporter: Caizhi Weng Fix For: table-store-0.2.0, table-store-0.3.0 We can't get configurations from DummyStreamExecutionEnvironment in Flink 1.14 (this is fixed by FLINK-26709 in Flink 1.15) so we have to use Java reflection to check if this execution environment is for batch job or for streaming job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint
fsk119 commented on code in PR #20298: URL: https://github.com/apache/flink/pull/20298#discussion_r930644202 ## flink-connectors/flink-connector-hive/src/test/resources/endpoint/hive_catalog.q: ## @@ -0,0 +1,121 @@ +# catalog_database.q - CREATE/DROP/SHOW/USE CATALOG/DATABASE +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# == +# test hive catalog +# == + +show current catalog; +!output ++--+ +| current catalog name | ++--+ +| hive | ++--+ +1 row in set +!ok + +show databases; +!output ++---+ +| database name | ++---+ +| default | ++---+ +1 row in set +!ok + +show tables; +!output +Empty set +!ok + +create database additional_test_database; +!output +++ +| result | +++ +| OK | +++ +1 row in set +!ok + +use additional_test_database; +!output +++ +| result | +++ +| OK | +++ +1 row in set +!ok + +create table param_types_table ( +dec DECIMAL(10, 10), +ch CHAR(5), +vch VARCHAR(15) +); +!output +++ +| result | +++ +| OK | +++ +1 row in set +!ok + +show tables; +!output ++---+ +|table name | ++---+ +| param_types_table | ++---+ +1 row in set +!ok + +show current database; +!output ++--+ +|current database name | ++--+ +| additional_test_database | ++--+ +1 row in set +!ok + +# == +# test hive table with parameterized types +# == + +describe hive.additional_test_database.param_types_table; +!output ++--+-+--+-++---+ +| name |type | null | key | extras | watermark | Review Comment: Yes. Maybe we should introduce a hive-style describe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource
xinbinhuang commented on PR #20289: URL: https://github.com/apache/flink/pull/20289#issuecomment-1196291153 @tweise Thank you so much for reviewing the PR! I just realized that I might have misread jira issue as `HybridSource: Support dynamic stop position in HybridSource` instead of `HybridSource: Support dynamic stop position in FileSource`. So this PR actually aimed to design an _**generic interface to allow any sources to participate in dynamic source switch**_. With that in mind. Let me explain how I came up with the current design & implementation. After reviewing the current logic of the hybrid source (amazing work !!), I understand that the current implementation support transferring the end position to the next enumerator. However, it lacks the mechanism to know where is the end position (i.e. offset for a kafka partition). And these "end positions" are probably unknown beforehand, or it would be the same as [fixed start position](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#fixed-start-position-at-graph-construction-time)? Therefore, I think the key is to transfer the "end position" to the enumerator from source reader during source switch. There are a few points to consider: 1. What would be the "end position" to transfer to the next source? I believe this varied by use cases. Some may find it enough to use `file.path`, while some may require to derive the timestamp or offset from the content of the file (i.e. kafka archive, and the implementation can vary by companies.). Since we can't anticipate all use cases, passing all finished splits seem to be a reasonable solution here and let the developer to decide how to derive the position from them. 2. Where to make the changes? I aimed to implement this s.t. most existing sources can benefit from it out of the box with minimal changes and no breaking changes to them. 3. How to store the "finished splits" before source switch? Per FLIP-27, the enumerator only knows what splits to consume but not the actual progress - only source reader knows about it. So we need to store them and transfer them to the enumerator during source switch. However, most existing sources implements `SourceReaderBase` and it [purges them from state](https://github.com/apache/flink/blob/3e2620bc785b3b4d82f1f188eb7b1e0e129b14d3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L193-L194) once it reaches the end position. One naive solution would be to adjust `SourceReaderBase` to also store finished splits into the state. However, this'll affect all sources running in production and is probably a big backward incompatible changes right away. Therefore, I decided to store them only in the `HybridSourceReader`, and the existing sources only need to implement one method (`DynamicHybridSourceReader::getFinishedSplits`) that allows the finished splits to be extracted during checkpoint and source switch. This process is transparent to all existing sources, and only happens when used with the `HybridSource`. With the above points, the current implementation works as follow: - On each checkpoint, `HybridSourceReader` retrieve the finished states (marked with `HybridSourceSplit.isFinished = true`) from the underlying reader and checkpoint them for persistent along with the unfinished states. - Upon source switch, `HybridSourceReader` will send all the finished splits in `SourceReaderFinishedEvent` to the enumerator. - Enumerator will pass along those finished splits to in `SourceSwitchContext` to the next source. And the next source can them use the splits to derive the start positions. Changes required on existing non hybrid sources: - Implements `DynamicHybridSourceReader::getFinishedSplits` on `SourceReaderBase`. API changes on `HybridSource` - Added `SourceSwitchContext::getPreviousSplits` which returns finished splits from the previous source. It's a lot of words, so really appreciate your patience for reading this. Let me know if there are anything unclear, I'm happy to chat more about this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener
[ https://issues.apache.org/jira/browse/FLINK-28696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-28696. --- Resolution: Fixed Fixed via bbaeb628f48a4bc4c324bfc4afd06ddf34f546f0 > Notify all newlyAdded/Merged blocked nodes to BlocklistListener > --- > > Key: FLINK-28696 > URL: https://issues.apache.org/jira/browse/FLINK-28696 > Project: Flink > Issue Type: Sub-task >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > This bug was introduced by FLINK-28660. Our newly added logic results in that > blocklist listener will not be notified when there are no newly added nodes > (only merge nodes) 。 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zhuzhurk closed pull request #20368: [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be notified when there are no newly added nodes (only merge nodes)
zhuzhurk closed pull request #20368: [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be notified when there are no newly added nodes (only merge nodes) URL: https://github.com/apache/flink/pull/20368 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r930593185 ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ## @@ -308,6 +337,24 @@ private void checkpointCoordinatorInternal( } } +private boolean closeGateways( +final long checkpointId, final Set subtasksToCheckpoint) { +boolean hasCloseableGateway = false; +for (int subtask : subtasksToCheckpoint) { +SubtaskGatewayImpl gateway = subtaskGatewayMap.get(subtask); +if (!gateway.tryCloseGateway(checkpointId)) { Review Comment: If we do want to capture that bug, how about we consistently call `tryCloseGateway()` for every subtasks and check the condition at the end of the loop, instead of stopping at the first subtask whose `tryCloseGateway()` return false? Otherwise, suppose `subtask_1.tryCloseGateway() == false` and `subtask_2.tryCloseGateway() == true`, the method will not throw Exception. This makes the method's behavior harder to understand. ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/AcknowledgeCheckpointEvent.java: ## @@ -18,20 +18,16 @@ package org.apache.flink.runtime.operators.coordination; -import org.apache.flink.runtime.messages.Acknowledge; - -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; +/** + * An {@link OperatorEvent} sent from a subtask to its {@link OperatorCoordinator} to signal that + * the checkpoint of an individual task is completed. + */ +public class AcknowledgeCheckpointEvent implements OperatorEvent { -/** Simple interface for a component that takes and sends events. */ -@FunctionalInterface -interface EventSender { +/** The ID of the checkpoint that this event is related to. */ +final long checkpointId; Review Comment: Is there any existing example where we make a variable protected and do not provide accessor method for this variable? If no, it seems better to keep code style consistent with existing code. ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsExactlyOnceTest.java: ## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import
[GitHub] [flink-ml] taosiyuan163 commented on pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test
taosiyuan163 commented on PR #132: URL: https://github.com/apache/flink-ml/pull/132#issuecomment-1196269416 Hi,@zhipeng93 , I updated the following according to your comments: 1. Optimize the implementation for several operators. 2. Reformat the document. 3. Fix the test cases. 4. Add validation for the size of the `inputs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-24434) PyFlink YARN per-job on Docker test fails on Azure
[ https://issues.apache.org/jira/browse/FLINK-24434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo resolved FLINK-24434. -- Resolution: Fixed Since https://issues.apache.org/jira/browse/FLINK-28680 has been resolved, I think we can close the JIRA. > PyFlink YARN per-job on Docker test fails on Azure > -- > > Key: FLINK-24434 > URL: https://issues.apache.org/jira/browse/FLINK-24434 > Project: Flink > Issue Type: Bug > Components: API / Python, Deployment / YARN >Affects Versions: 1.14.4, 1.15.0, 1.16.0 >Reporter: Dawid Wysakowicz >Assignee: Huang Xingbo >Priority: Critical > Labels: stale-major, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24669=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=23186 > {code} > Sep 30 18:20:22 Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): > Permission denied: user=mapred, access=WRITE, > inode="/":hdfs:hadoop:drwxr-xr-x > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:318) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:219) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:189) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1663) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1647) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1606) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3039) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1079) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:652) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > Sep 30 18:20:22 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447) > Sep 30 18:20:22 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) > Sep 30 18:20:22 at > org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850) > Sep 30 18:20:22 at > org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793) > Sep 30 18:20:22 at java.security.AccessController.doPrivileged(Native > Method) > Sep 30 18:20:22 at javax.security.auth.Subject.doAs(Subject.java:422) > Sep 30 18:20:22 at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840) > Sep 30 18:20:22 at > org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489) > Sep 30 18:20:22 > Sep 30 18:20:22 at > org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) > Sep 30 18:20:22 at org.apache.hadoop.ipc.Client.call(Client.java:1435) > Sep 30 18:20:22 at org.apache.hadoop.ipc.Client.call(Client.java:1345) > Sep 30 18:20:22 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) > Sep 30 18:20:22 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > Sep 30 18:20:22 at com.sun.proxy.$Proxy12.mkdirs(Unknown Source) > Sep 30 18:20:22 at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:583) > Sep 30 18:20:22 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Sep 30 18:20:22 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Sep 30 18:20:22 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Sep 30 18:20:22 at java.lang.reflect.Method.invoke(Method.java:498) > Sep 30 18:20:22 at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) > Sep 30 18:20:22 at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) > Sep 30 18:20:22 at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) > Sep 30 18:20:22 at >
[jira] [Assigned] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener
[ https://issues.apache.org/jira/browse/FLINK-28696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-28696: --- Assignee: Lijie Wang > Notify all newlyAdded/Merged blocked nodes to BlocklistListener > --- > > Key: FLINK-28696 > URL: https://issues.apache.org/jira/browse/FLINK-28696 > Project: Flink > Issue Type: Sub-task >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > This bug was introduced by FLINK-28660. Our newly added logic results in that > blocklist listener will not be notified when there are no newly added nodes > (only merge nodes) 。 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
[ https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-28698: Assignee: Junhan Yang > The display order of aggregated metrics should follow the order of task state > transitions > - > > Key: FLINK-28698 > URL: https://issues.apache.org/jira/browse/FLINK-28698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.0 >Reporter: Lijie Wang >Assignee: Junhan Yang >Priority: Minor > Attachments: image-2022-07-27-10-00-49-345.png > > > !image-2022-07-27-10-00-49-345.png|width=921,height=382! > Currently, the display order of task state duration is INITIALIZING, > CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable > to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is > follow the order of task state transitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28699) Native rocksdb full snapshot in non-incremental checkpointing
Lihe Ma created FLINK-28699: --- Summary: Native rocksdb full snapshot in non-incremental checkpointing Key: FLINK-28699 URL: https://issues.apache.org/jira/browse/FLINK-28699 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.15.1, 1.14.5 Reporter: Lihe Ma When rocksdb statebackend is used and state.backend.incremental enabled, flink will figure out newly created sst files generated by rocksdb during checkpoint, and read all the states from rocksdb and write to files during savepoint [1]. When state.backend.incremental disabled, flink will read all the states from rocksdb and generate state files in checkpoint and savepoint [2]. This makes sense in savepoint, cause user can take a savepoint with rocksdb statebackend and then restore it using another statebackend, but in checkpoint, deserialisation and serialisation of state results in performance loss. If the native rocksdb snapshot is introduced in full snapshot, theoretically better performance can be achieved. At the same time, savepoint remains the same as before. # https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java # https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571719#comment-17571719 ] Hangxiang Yu commented on FLINK-28653: -- Hi, Maybe you could try to set {{env.getConfig().enableForceAvro() to force avro}} {{or }} {{{}{}}}{{{}env.getConfig().disableGenericTypes() to disable kryo{}}} and then see whether it works. > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) > at >
[jira] [Assigned] (FLINK-28577) 1.15.1 web ui console report error about checkpoint size
[ https://issues.apache.org/jira/browse/FLINK-28577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-28577: Assignee: Yu Chen > 1.15.1 web ui console report error about checkpoint size > > > Key: FLINK-28577 > URL: https://issues.apache.org/jira/browse/FLINK-28577 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.15.1 >Reporter: nobleyd >Assignee: Yu Chen >Priority: Major > > 1.15.1 > 1 start-cluster > 2 submit job: ./bin/flink run -d ./examples/streaming/TopSpeedWindowing.jar > 3 trigger savepoint: ./bin/flink savepoint {{{jobId} ./sp0}} > {{4 open web ui for job and change to checkpoint tab, nothing showed.}} > {{Chrome console log shows some error:}} > {{main.a7e97c2f60a2616e.js:1 ERROR TypeError: Cannot read properties of null > (reading 'checkpointed_size') > at q (253.e9e8f2b56b4981f5.js:1:607974) > at Sl (main.a7e97c2f60a2616e.js:1:186068) > at Br (main.a7e97c2f60a2616e.js:1:184696) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at B8 (main.a7e97c2f60a2616e.js:1:191872)}} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28577) 1.15.1 web ui console report error about checkpoint size
[ https://issues.apache.org/jira/browse/FLINK-28577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571720#comment-17571720 ] Yun Tang commented on FLINK-28577: -- [~Yu Chen] Thanks for figuring out this problem, and it seems we cannot have tests to cover the web UI changes:(, and maybe typos could lead such problems. Already assigned this ticket to you, please go ahead. > 1.15.1 web ui console report error about checkpoint size > > > Key: FLINK-28577 > URL: https://issues.apache.org/jira/browse/FLINK-28577 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.15.1 >Reporter: nobleyd >Assignee: Yu Chen >Priority: Major > > 1.15.1 > 1 start-cluster > 2 submit job: ./bin/flink run -d ./examples/streaming/TopSpeedWindowing.jar > 3 trigger savepoint: ./bin/flink savepoint {{{jobId} ./sp0}} > {{4 open web ui for job and change to checkpoint tab, nothing showed.}} > {{Chrome console log shows some error:}} > {{main.a7e97c2f60a2616e.js:1 ERROR TypeError: Cannot read properties of null > (reading 'checkpointed_size') > at q (253.e9e8f2b56b4981f5.js:1:607974) > at Sl (main.a7e97c2f60a2616e.js:1:186068) > at Br (main.a7e97c2f60a2616e.js:1:184696) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at B8 (main.a7e97c2f60a2616e.js:1:191872)}} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zoltar9264 commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …
zoltar9264 commented on code in PR #20152: URL: https://github.com/apache/flink/pull/20152#discussion_r930584225 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java: ## @@ -87,9 +87,14 @@ protected CheckpointableKeyedStateBackend restore( String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks(); ExecutionConfig executionConfig = env.getExecutionConfig(); +env.getAsyncOperationsThreadPool(); + ChangelogStateFactory changelogStateFactory = new ChangelogStateFactory(); CheckpointableKeyedStateBackend keyedStateBackend = ChangelogBackendRestoreOperation.restore( +env.getJobID(), +env.getAsyncOperationsThreadPool(), +env.getTaskManagerInfo().getConfiguration(), Review Comment: Hi @fredia , thanks for reply. I'm not suggest pass PERIODIC_MATERIALIZATION_INTERVAL directly. StateChangelogStorage may have different implementations, each one has different options. I think an implementation-specific configuration should not be exposed in the interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID
[ https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571718#comment-17571718 ] Yun Tang commented on FLINK-28697: -- [~jmahonin] Did you use different flink versions between the client and server side? Or could you share the full exception stack trace? > MapDataSerializer doesn't declare a serialVersionUID > > > Key: FLINK-28697 > URL: https://issues.apache.org/jira/browse/FLINK-28697 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.15.1 >Reporter: Josh Mahonin >Priority: Major > Labels: pull-request-available > > MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a > InvalidClassException when attempting to serialize with different JREs for > compilation / runtime. > {code:java} > Caused by: java.io.InvalidClassException: > org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class > incompatible: stream classdesc serialVersionUID = 2533002123505507000, local > class serialVersionUID = 1622156938509929811 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID
[ https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-28697: - Fix Version/s: (was: 1.15.1) > MapDataSerializer doesn't declare a serialVersionUID > > > Key: FLINK-28697 > URL: https://issues.apache.org/jira/browse/FLINK-28697 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.15.1 >Reporter: Josh Mahonin >Priority: Major > Labels: pull-request-available > > MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a > InvalidClassException when attempting to serialize with different JREs for > compilation / runtime. > {code:java} > Caused by: java.io.InvalidClassException: > org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class > incompatible: stream classdesc serialVersionUID = 2533002123505507000, local > class serialVersionUID = 1622156938509929811 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Tartarus0zm commented on pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax
Tartarus0zm commented on PR #20252: URL: https://github.com/apache/flink/pull/20252#issuecomment-1196221001 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
[ https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-28698: - Priority: Minor (was: Major) > The display order of aggregated metrics should follow the order of task state > transitions > - > > Key: FLINK-28698 > URL: https://issues.apache.org/jira/browse/FLINK-28698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.0 >Reporter: Lijie Wang >Priority: Minor > Attachments: image-2022-07-27-10-00-49-345.png > > > !image-2022-07-27-10-00-49-345.png|width=921,height=382! > Currently, the display order of task state duration is INITIALIZING, > CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable > to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is > follow the order of task state transitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
[ https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571715#comment-17571715 ] Xintong Song commented on FLINK-28698: -- Thanks [~wanglijie95], that makes sense to me. [~junhany], could you please take a look? > The display order of aggregated metrics should follow the order of task state > transitions > - > > Key: FLINK-28698 > URL: https://issues.apache.org/jira/browse/FLINK-28698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.0 >Reporter: Lijie Wang >Priority: Major > Attachments: image-2022-07-27-10-00-49-345.png > > > !image-2022-07-27-10-00-49-345.png|width=921,height=382! > Currently, the display order of task state duration is INITIALIZING, > CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable > to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is > follow the order of task state transitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
[ https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-28698: - Affects Version/s: 1.16.0 > The display order of aggregated metrics should follow the order of task state > transitions > - > > Key: FLINK-28698 > URL: https://issues.apache.org/jira/browse/FLINK-28698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.0 >Reporter: Lijie Wang >Priority: Major > Attachments: image-2022-07-27-10-00-49-345.png > > > !image-2022-07-27-10-00-49-345.png|width=921,height=382! > Currently, the display order of task state duration is INITIALIZING, > CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable > to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is > follow the order of task state transitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] PatrickRen merged pull request #20177: [FLINK-28416][table] Add (Async)LookupFunction and providers in replace of (Async)TableFunction as the API for lookup table
PatrickRen merged PR #20177: URL: https://github.com/apache/flink/pull/20177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LinMingQiang commented on pull request #19995: [FLINK-28060][BP 1.15][Connector/Kafka] Updated Kafka Clients to 3.1.1
LinMingQiang commented on PR #19995: URL: https://github.com/apache/flink/pull/19995#issuecomment-1196218666 I still have problems applying this patch on 1.15. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …
fredia commented on code in PR #20152: URL: https://github.com/apache/flink/pull/20152#discussion_r930579497 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java: ## @@ -87,9 +87,14 @@ protected CheckpointableKeyedStateBackend restore( String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks(); ExecutionConfig executionConfig = env.getExecutionConfig(); +env.getAsyncOperationsThreadPool(); + ChangelogStateFactory changelogStateFactory = new ChangelogStateFactory(); CheckpointableKeyedStateBackend keyedStateBackend = ChangelogBackendRestoreOperation.restore( +env.getJobID(), +env.getAsyncOperationsThreadPool(), +env.getTaskManagerInfo().getConfiguration(), Review Comment: > Do you mean changing StateChangelogStorageFactory interface and passing ExecutionConfig to createStorageView instead of Configuration? I tend to put `PERIODIC_MATERIALIZATION_INTERVAL` directly as a parameter instead of `xxConfiguration`. > it should be env.getJobConfiguration()ideally merged withenv.getTaskManagerInfo().getConfiguration()`, right? I'm not sure. Currently, most configuration is in `env.getTaskManagerInfo().getConfiguration()` or `env.getTaskConfiguration()`, which is better to merge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28577) 1.15.1 web ui console report error about checkpoint size
[ https://issues.apache.org/jira/browse/FLINK-28577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571713#comment-17571713 ] Yu Chen commented on FLINK-28577: - Well, I think I've found the root cause of this issue: it is mainly introduced by the mistake modification of the flink-runtime-web by [JIRA-25557|https://issues.apache.org/jira/browse/FLINK-25557]. And I can propose a PR to resolve the problem. cc: [~yunta] > 1.15.1 web ui console report error about checkpoint size > > > Key: FLINK-28577 > URL: https://issues.apache.org/jira/browse/FLINK-28577 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.15.1 >Reporter: nobleyd >Priority: Major > > 1.15.1 > 1 start-cluster > 2 submit job: ./bin/flink run -d ./examples/streaming/TopSpeedWindowing.jar > 3 trigger savepoint: ./bin/flink savepoint {{{jobId} ./sp0}} > {{4 open web ui for job and change to checkpoint tab, nothing showed.}} > {{Chrome console log shows some error:}} > {{main.a7e97c2f60a2616e.js:1 ERROR TypeError: Cannot read properties of null > (reading 'checkpointed_size') > at q (253.e9e8f2b56b4981f5.js:1:607974) > at Sl (main.a7e97c2f60a2616e.js:1:186068) > at Br (main.a7e97c2f60a2616e.js:1:184696) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at N8 (main.a7e97c2f60a2616e.js:1:185128) > at Br (main.a7e97c2f60a2616e.js:1:185153) > at B8 (main.a7e97c2f60a2616e.js:1:191872)}} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] DavidLiu001 commented on pull request #20338: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…
DavidLiu001 commented on PR #20338: URL: https://github.com/apache/flink/pull/20338#issuecomment-1196217453 Nice, I will follow your suggestion for the later PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] DavidLiu001 commented on pull request #20338: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…
DavidLiu001 commented on PR #20338: URL: https://github.com/apache/flink/pull/20338#issuecomment-1196217231 Nice, I will follow your suggestion for the later PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20371: [FLINK-27908][network] Introduce HsResultPartition and HsSubpartitionView
flinkbot commented on PR #20371: URL: https://github.com/apache/flink/pull/20371#issuecomment-1196216583 ## CI report: * ee6783ba798cf9d8adfed4d750769614f3626240 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #20306: [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression
masteryhx commented on code in PR #20306: URL: https://github.com/apache/flink/pull/20306#discussion_r930578280 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java: ## @@ -168,7 +168,7 @@ private UploadTasksResult upload(Path path, Collection tasks) throws wrappedStreamClosed = true; } } finally { -if (!wrappedStreamClosed) { +if (!wrappedStreamClosed || compression) { fsStream.close(); } Review Comment: I have tried to remove and rerun it, it also cannot work. It's not the reason. Actually, I found the the case will always fail whether enabling compression or not. What confused me is that the case will always fail only if I warp the stream even if there is no other logic. And It just keep running until timeout and don't thrown an exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] DavidLiu001 commented on pull request #20360: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…
DavidLiu001 commented on PR #20360: URL: https://github.com/apache/flink/pull/20360#issuecomment-1196214414 BUILD FAILURE: Failed to execute goal on project flink-connector-elasticsearch-base: Could not resolve dependencies for project org.apache.flink:flink-connector-elasticsearch-base:jar:1.16-SNAPSHOT: Failed to collect dependencies at org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:7.10.2 -> org.elasticsearch.plugin:rank-eval-client:jar:7.10.2: Failed to read artifact descriptor for org.elasticsearch.plugin:rank-eval-client:jar:7.10.2: Could not transfer artifact org.elasticsearch.plugin:rank-eval-client:pom:7.10.2 from/to google-maven-central (https://maven-central-eu.storage-download.googleapis.com/maven2/): Read timed out -> [Help 1] @dannycranmer It seems it is the environment issue of Azure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27908) Introduce HsResultPartition and HsSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-27908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27908: --- Labels: pull-request-available (was: ) > Introduce HsResultPartition and HsSubpartitionView > -- > > Key: FLINK-27908 > URL: https://issues.apache.org/jira/browse/FLINK-27908 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa opened a new pull request, #20371: [FLINK-27908][network] Introduce HsResultPartition and HsSubpartitionView
reswqa opened a new pull request, #20371: URL: https://github.com/apache/flink/pull/20371 ## What is the purpose of the change *Introduce HsResultPartition and HsSubpartitionView, this is the last part of hybrid shuffle mode in the shuffle layer.* ## Brief change log - *Extends onResultPartitionClosed to HsSpillingStrategy and add lifecycle method for hybrid shuffle component.* - *HybridShuffleConfiguration supports set spillingStrategy name and load spillingStrategy.* - *Introduce HsResultPartition and HsSubpartitionView.* - *ResultPartitionFactory also supports HYBRID type and introduce config option for hybrid spilling strategy.* ## Verifying this change This change added UT tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Tartarus0zm commented on pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax
Tartarus0zm commented on PR #20252: URL: https://github.com/apache/flink/pull/20252#issuecomment-1196203437 @flinkbo run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …
zoltar9264 commented on code in PR #20152: URL: https://github.com/apache/flink/pull/20152#discussion_r930567765 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorWithCache.java: ## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.configuration.StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL; + +/** StateChangeIterator with local cache. */ +class StateChangeIteratorWithCache extends StateChangeIteratorImpl { +private static final Logger LOG = LoggerFactory.getLogger(StateChangeIteratorWithCache.class); + +private static final String CACHE_FILE_PREFIX = "dstl-"; + +private final File cacheDir; +private final ConcurrentMap cache = new ConcurrentHashMap<>(); +private final ScheduledExecutorService cacheCleanScheduler; +private final ExecutorService downloadExecutor; +private final long cacheIdleMillis; + +StateChangeIteratorWithCache(ExecutorService downloadExecutor, Configuration config) { +// TODO: 2022/5/31 add a new options for cache idle +long cacheIdleMillis = config.get(PERIODIC_MATERIALIZATION_INTERVAL).toMillis(); +File cacheDir = ConfigurationUtils.getRandomTempDirectory(config); + +this.cacheCleanScheduler = +SchedulerFactory.create(1, "ChangelogCacheFileCleanScheduler", LOG); +this.downloadExecutor = downloadExecutor; +this.cacheIdleMillis = cacheIdleMillis; +this.cacheDir = cacheDir; +} + +@Override +public CloseableIterator read(StreamStateHandle handle, long offset) +throws IOException { + +if (!(handle instanceof FileStateHandle)) { +return new StateChangeFormat().read(wrapAndSeek(handle.openInputStream(), offset)); +} + +FileStateHandle fileHandle = (FileStateHandle) handle; +DataInputStream input; + +if (fileHandle.getFilePath().getFileSystem().isDistributedFS()) { + +Path dfsPath = fileHandle.getFilePath(); +FileCache fileCache = +cache.computeIfAbsent( +dfsPath, +key -> { +FileCache fCache = new FileCache(cacheDir); +downloadExecutor.execute(() -> downloadFile(fileHandle, fCache)); +return fCache; +}); + +FileInputStream fin = fileCache.openAndSeek(offset); + +input = +new DataInputStream(new BufferedInputStream(fin)) { +@Override +public void close() throws IOException { +super.close(); +if (fileCache.getRefCount() == 0) { +
[jira] [Commented] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
[ https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571694#comment-17571694 ] Lijie Wang commented on FLINK-28698: cc [~xtsong] > The display order of aggregated metrics should follow the order of task state > transitions > - > > Key: FLINK-28698 > URL: https://issues.apache.org/jira/browse/FLINK-28698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Lijie Wang >Priority: Major > Attachments: image-2022-07-27-10-00-49-345.png > > > !image-2022-07-27-10-00-49-345.png|width=921,height=382! > Currently, the display order of task state duration is INITIALIZING, > CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable > to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is > follow the order of task state transitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
[ https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang updated FLINK-28698: --- Description: !image-2022-07-27-10-00-49-345.png|width=921,height=382! Currently, the display order of task state duration is INITIALIZING, CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is follow the order of task state transitions. was: !image-2022-07-27-10-00-49-345.png|width=921,height=382! Currently, the display order of task state duration is INITIALIZING, CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is follow the order of task state > The display order of aggregated metrics should follow the order of task state > transitions > - > > Key: FLINK-28698 > URL: https://issues.apache.org/jira/browse/FLINK-28698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Lijie Wang >Priority: Major > Attachments: image-2022-07-27-10-00-49-345.png > > > !image-2022-07-27-10-00-49-345.png|width=921,height=382! > Currently, the display order of task state duration is INITIALIZING, > CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable > to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is > follow the order of task state transitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
[ https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang updated FLINK-28698: --- Description: !image-2022-07-27-10-00-49-345.png|width=921,height=382! Currently, the display order of task state duration is INITIALIZING, CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is follow the order of task state > The display order of aggregated metrics should follow the order of task state > transitions > - > > Key: FLINK-28698 > URL: https://issues.apache.org/jira/browse/FLINK-28698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Lijie Wang >Priority: Major > Attachments: image-2022-07-27-10-00-49-345.png > > > !image-2022-07-27-10-00-49-345.png|width=921,height=382! > Currently, the display order of task state duration is INITIALIZING, > CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable > to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is > follow the order of task state -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
[ https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang updated FLINK-28698: --- Attachment: image-2022-07-27-10-00-49-345.png > The display order of aggregated metrics should follow the order of task state > transitions > - > > Key: FLINK-28698 > URL: https://issues.apache.org/jira/browse/FLINK-28698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Lijie Wang >Priority: Major > Attachments: image-2022-07-27-10-00-49-345.png > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions
Lijie Wang created FLINK-28698: -- Summary: The display order of aggregated metrics should follow the order of task state transitions Key: FLINK-28698 URL: https://issues.apache.org/jira/browse/FLINK-28698 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: Lijie Wang Attachments: image-2022-07-27-10-00-49-345.png -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26902) Introduce performence tune and benchmark document for table store
[ https://issues.apache.org/jira/browse/FLINK-26902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571693#comment-17571693 ] Jingsong Lee commented on FLINK-26902: -- Hi [~ConradJam] You can see from your point of view, how having a document can help to better use. For benchmark, there is a flink-table-store-benchmark in the repo. > Introduce performence tune and benchmark document for table store > - > > Key: FLINK-26902 > URL: https://issues.apache.org/jira/browse/FLINK-26902 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Priority: Minor > Fix For: table-store-0.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Vancior commented on a diff in pull request #20263: [FLINK-28510][python][connector] Support using new KafkaSink API
Vancior commented on code in PR #20263: URL: https://github.com/apache/flink/pull/20263#discussion_r930546050 ## flink-python/pyflink/datastream/connectors/base.py: ## @@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]): super(Sink, self).__init__(sink) +class TransformAppender(ABC): + +@abstractmethod +def apply(self, ds): +pass + + +class PreTransformWrapper(ABC): + +@abstractmethod +def need_pre_transform(self) -> bool: Review Comment: This is still needed that a sink implementing `SupportsPreprocessing` may not require preprocessing, e.g. `KafkaSink` doesn't need preprocessing when using fixed topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28692) Check warehouse path in CatalogFactory
[ https://issues.apache.org/jira/browse/FLINK-28692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-28692. Resolution: Fixed master: d30e7998b136dec0f9aded8376a516a76b467f2e release-0.2: f149a4f349fa2ced986bec42b3d1f0cd27004343 > Check warehouse path in CatalogFactory > -- > > Key: FLINK-28692 > URL: https://issues.apache.org/jira/browse/FLINK-28692 > Project: Flink > Issue Type: Bug > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Blocker > Labels: pull-request-available > Fix For: table-store-0.2.0 > > > * Not exist, automatic creating the directory. > * Exist but it is not directory, throw exception. > * Exist and it is a directory, pass... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #243: [FLINK-28692] Check warehouse path in CatalogFactory
JingsongLi merged PR #243: URL: https://github.com/apache/flink-table-store/pull/243 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource
tweise commented on PR #20289: URL: https://github.com/apache/flink/pull/20289#issuecomment-1196163399 @xinbinhuang looking at all the modifications to HybridSource itself I think it is necessary to take a step back here and discuss the design aspects first. The underlying sources are a sequence of bounded sources, optionally followed by an unbounded source. Therefore, there should be no need to have a "dynamic reader" that does special things. The enumerator knows upfront which splits need to be processed and when it is finished. The HybridSource already has the support to transfer the end position to the next enumerator. That was part of the FLIP and the details can be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source and you can find an example in the tests: HybridSourceITCase.sourceWithDynamicSwitchPosition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] czy006 commented on pull request #308: [FLINK-28223] Add artifact-fetcher to the pod-template.yaml example
czy006 commented on PR #308: URL: https://github.com/apache/flink-kubernetes-operator/pull/308#issuecomment-1196161652 @morhidi PLAT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side
lsyldliu commented on code in PR #20361: URL: https://github.com/apache/flink/pull/20361#discussion_r929988353 ## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/AddRemoteJarITCase.java: ## @@ -0,0 +1,110 @@ +package org.apache.flink.table.client.gateway.context; + +import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.UserClassLoaderJarTestUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM; +import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; +import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS; +import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for adding remote jar. */ +public class AddRemoteJarITCase { +@ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + Review Comment: We should not introduce this test here, the local jar tests has been covered by existing test. Regarding to remote jar test, it should be introduced in e2e module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28681) The number of windows allocated by sliding windows is inaccurate, when the window length is irregular
[ https://issues.apache.org/jira/browse/FLINK-28681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-28681: -- Description: In assignWindows, the initial length of the list of Windows is determined by '(int) (size/slide)'. {code:java} List windows = new ArrayList<>((int) (size / slide)) {code} This calculation is not accurate when the sliding window length is irregular. For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives 3, but the final number of Windows is 4. Although this does not affect functionality.But I think it would be better. {code:java} int len = (int) Math.ceil((double) size / slide); List windows = new ArrayList<>(len); {code} was: In assignWindows, the initial length of the list of Windows is determined by '(int) (size/slide)'. {code:java} List windows = new ArrayList<>((int) (size / slide)) {code} This calculation is not accurate when the sliding window length is irregular. For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives 3, but the final number of Windows is 4. Although this does not affect functionality.But I think it would be better. {code:java} int len = (int) Math.ceil((double) (size / slide)); List windows = new ArrayList<>(len); {code} > The number of windows allocated by sliding windows is inaccurate, when the > window length is irregular > - > > Key: FLINK-28681 > URL: https://issues.apache.org/jira/browse/FLINK-28681 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: nyingping >Priority: Minor > > In assignWindows, the initial length of the list of Windows is determined by > '(int) (size/slide)'. > {code:java} > List windows = new ArrayList<>((int) (size / slide)) {code} > This calculation is not accurate when the sliding window length is irregular. > For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives > 3, but the final number of Windows is 4. > > Although this does not affect functionality.But I think it would be better. > {code:java} > int len = (int) Math.ceil((double) size / slide); > List windows = new ArrayList<>(len); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side
lsyldliu commented on code in PR #20361: URL: https://github.com/apache/flink/pull/20361#discussion_r92650 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) { return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true); } +@Override +public void addJar(String jarPath) { Review Comment: Please add UT about `ADD Jar` and `SHOW JARS` in `TableEnvironmentTest`, add IT case about `ADD Jar` in `TableEnvironmentITCase` which you can refer to the related tests in `FunctionITCase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…
flinkbot commented on PR #20370: URL: https://github.com/apache/flink/pull/20370#issuecomment-1196114714 ## CI report: * c584408092f780ceb187b5d90cb999a1d699b2ae UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
[ https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28185: --- Labels: pull-request-available (was: ) > "Invalid negative offset" when using OffsetsInitializer.timestamp(.) > > > Key: FLINK-28185 > URL: https://issues.apache.org/jira/browse/FLINK-28185 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0 > Environment: Flink 1.15.0 > Kafka 2.8.1 >Reporter: Peter Schrott >Assignee: Mason Chen >Priority: Minor > Labels: pull-request-available > Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, > NegativeOffsetSpec.scala > > > When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty > partitions – little traffice + low retention – an {{IllegalArgumentException: > Invalid negative offset}} occures. See stracktrace below. > The problem here is, that the admin client returns -1 as timestamps and > offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] > Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} > object from the admin client response the exception is thrown. > {code:java} > org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition > splits due to > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615) > at > org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) > ... 8 common frames omitted > 15:25:58.025 INFO [flink-akka.actor.default-dispatcher-11] > o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator > 351e440289835f2ff3e6fee31bf6e13c). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) > at >
[GitHub] [flink] mas-chen opened a new pull request, #20370: [FLINK-28185] Make TimestampOffsetsInitializer apply offset reset str…
mas-chen opened a new pull request, #20370: URL: https://github.com/apache/flink/pull/20370 …ategy and handle timestamps that do not map to an offset ## What is the purpose of the change This change improves the TimestampOffsetsInitializer to be initialized with a configured offset reset strategy. The default behavior (LATEST) is preserved. This also fixes a bug for when the timestamp does not correspond to an offset in Kafka and clarifies the exception message that is thrown. ## Brief change log - Handles EARLIEST/LATEST/NONE - For timestamps that do not correspond to an offset in Kafka and if configured with NONE, the initializer will throw an explicit exception. ## Verifying this change This change added tests and can be verified as follows: - Added unit test to test the various offset reset strategies and the edge case where timestamp does not correspond to an offset in Kafka ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …
rkhachatryan commented on code in PR #20152: URL: https://github.com/apache/flink/pull/20152#discussion_r930493192 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorWithCache.java: ## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.configuration.StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL; + +/** StateChangeIterator with local cache. */ +class StateChangeIteratorWithCache extends StateChangeIteratorImpl { +private static final Logger LOG = LoggerFactory.getLogger(StateChangeIteratorWithCache.class); + +private static final String CACHE_FILE_PREFIX = "dstl-"; + +private final File cacheDir; +private final ConcurrentMap cache = new ConcurrentHashMap<>(); +private final ScheduledExecutorService cacheCleanScheduler; +private final ExecutorService downloadExecutor; +private final long cacheIdleMillis; + +StateChangeIteratorWithCache(ExecutorService downloadExecutor, Configuration config) { +// TODO: 2022/5/31 add a new options for cache idle +long cacheIdleMillis = config.get(PERIODIC_MATERIALIZATION_INTERVAL).toMillis(); +File cacheDir = ConfigurationUtils.getRandomTempDirectory(config); + +this.cacheCleanScheduler = +SchedulerFactory.create(1, "ChangelogCacheFileCleanScheduler", LOG); +this.downloadExecutor = downloadExecutor; +this.cacheIdleMillis = cacheIdleMillis; +this.cacheDir = cacheDir; +} + +@Override +public CloseableIterator read(StreamStateHandle handle, long offset) +throws IOException { + +if (!(handle instanceof FileStateHandle)) { +return new StateChangeFormat().read(wrapAndSeek(handle.openInputStream(), offset)); +} + +FileStateHandle fileHandle = (FileStateHandle) handle; +DataInputStream input; + +if (fileHandle.getFilePath().getFileSystem().isDistributedFS()) { + +Path dfsPath = fileHandle.getFilePath(); +FileCache fileCache = +cache.computeIfAbsent( +dfsPath, +key -> { +FileCache fCache = new FileCache(cacheDir); +downloadExecutor.execute(() -> downloadFile(fileHandle, fCache)); +return fCache; +}); + +FileInputStream fin = fileCache.openAndSeek(offset); + +input = +new DataInputStream(new BufferedInputStream(fin)) { +@Override +public void close() throws IOException { +super.close(); +if (fileCache.getRefCount() == 0) { +
[jira] [Updated] (FLINK-27701) HashMapStateBackendWindowITCase. testAggregateWindowStateReader failed with Not all required tasks are currently running
[ https://issues.apache.org/jira/browse/FLINK-27701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27701: --- Labels: auto-deprioritized-major test-stability (was: stale-major test-stability) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > HashMapStateBackendWindowITCase. testAggregateWindowStateReader failed with > Not all required tasks are currently running > - > > Key: FLINK-27701 > URL: https://issues.apache.org/jira/browse/FLINK-27701 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > {code:java} > 2022-05-19T11:04:27.4331524Z May 19 11:04:27 [ERROR] Tests run: 9, Failures: > 0, Errors: 1, Skipped: 0, Time elapsed: 29.034 s <<< FAILURE! - in > org.apache.flink.state.api.HashMapStateBackendWindowITCase > 2022-05-19T11:04:27.4333055Z May 19 11:04:27 [ERROR] > org.apache.flink.state.api.HashMapStateBackendWindowITCase.testAggregateWindowStateReader > Time elapsed: 0.105 s <<< ERROR! > 2022-05-19T11:04:27.4333765Z May 19 11:04:27 java.lang.RuntimeException: > Failed to take savepoint > 2022-05-19T11:04:27.4334405Z May 19 11:04:27 at > org.apache.flink.state.api.utils.SavepointTestBase.takeSavepoint(SavepointTestBase.java:68) > 2022-05-19T11:04:27.4335375Z May 19 11:04:27 at > org.apache.flink.state.api.SavepointWindowReaderITCase.testAggregateWindowStateReader(SavepointWindowReaderITCase.java:149) > 2022-05-19T11:04:27.4338106Z May 19 11:04:27 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-05-19T11:04:27.4339140Z May 19 11:04:27 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-05-19T11:04:27.4339854Z May 19 11:04:27 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-05-19T11:04:27.4340560Z May 19 11:04:27 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-05-19T11:04:27.4341746Z May 19 11:04:27 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-05-19T11:04:27.4342797Z May 19 11:04:27 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-05-19T11:04:27.4343717Z May 19 11:04:27 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-05-19T11:04:27.4344909Z May 19 11:04:27 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-05-19T11:04:27.4345993Z May 19 11:04:27 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2022-05-19T11:04:27.4346981Z May 19 11:04:27 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-05-19T11:04:27.4347590Z May 19 11:04:27 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-05-19T11:04:27.4348200Z May 19 11:04:27 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-05-19T11:04:27.4348856Z May 19 11:04:27 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-05-19T11:04:27.4349484Z May 19 11:04:27 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-05-19T11:04:27.4350118Z May 19 11:04:27 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-05-19T11:04:27.4350899Z May 19 11:04:27 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-05-19T11:04:27.4352057Z May 19 11:04:27 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-05-19T11:04:27.4353154Z May 19 11:04:27 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-05-19T11:04:27.4354153Z May 19 11:04:27 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-05-19T11:04:27.4354936Z May 19 11:04:27 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-05-19T11:04:27.4355560Z May 19 11:04:27 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-05-19T11:04:27.4356167Z May 19 11:04:27 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-05-19T11:04:27.4356775Z May 19 11:04:27 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-05-19T11:04:27.4357358Z May 19 11:04:27 at >
[jira] [Updated] (FLINK-27695) KafkaTransactionLogITCase failed on azure due to Could not find a valid Docker environment
[ https://issues.apache.org/jira/browse/FLINK-27695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27695: --- Labels: auto-deprioritized-major test-stability (was: stale-major test-stability) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > KafkaTransactionLogITCase failed on azure due to Could not find a valid > Docker environment > -- > > Key: FLINK-27695 > URL: https://issues.apache.org/jira/browse/FLINK-27695 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Connectors / Kafka >Affects Versions: 1.14.4 >Reporter: Huang Xingbo >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > {code:java} > 022-05-19T02:04:23.9190098Z May 19 02:04:23 [ERROR] Tests run: 1, Failures: > 0, Errors: 1, Skipped: 0, Time elapsed: 7.404 s <<< FAILURE! - in > org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase > 2022-05-19T02:04:23.9191182Z May 19 02:04:23 [ERROR] > org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase Time > elapsed: 7.404 s <<< ERROR! > 2022-05-19T02:04:23.9192250Z May 19 02:04:23 java.lang.IllegalStateException: > Could not find a valid Docker environment. Please see logs and check > configuration > 2022-05-19T02:04:23.9193144Z May 19 02:04:23 at > org.testcontainers.dockerclient.DockerClientProviderStrategy.lambda$getFirstValidStrategy$4(DockerClientProviderStrategy.java:156) > 2022-05-19T02:04:23.9194653Z May 19 02:04:23 at > java.util.Optional.orElseThrow(Optional.java:290) > 2022-05-19T02:04:23.9196179Z May 19 02:04:23 at > org.testcontainers.dockerclient.DockerClientProviderStrategy.getFirstValidStrategy(DockerClientProviderStrategy.java:148) > 2022-05-19T02:04:23.9197995Z May 19 02:04:23 at > org.testcontainers.DockerClientFactory.getOrInitializeStrategy(DockerClientFactory.java:146) > 2022-05-19T02:04:23.9199486Z May 19 02:04:23 at > org.testcontainers.DockerClientFactory.client(DockerClientFactory.java:188) > 2022-05-19T02:04:23.9200666Z May 19 02:04:23 at > org.testcontainers.DockerClientFactory$1.getDockerClient(DockerClientFactory.java:101) > 2022-05-19T02:04:23.9202109Z May 19 02:04:23 at > com.github.dockerjava.api.DockerClientDelegate.authConfig(DockerClientDelegate.java:107) > 2022-05-19T02:04:23.9203065Z May 19 02:04:23 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:316) > 2022-05-19T02:04:23.9204641Z May 19 02:04:23 at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066) > 2022-05-19T02:04:23.9205765Z May 19 02:04:23 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > 2022-05-19T02:04:23.9206568Z May 19 02:04:23 at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2022-05-19T02:04:23.9207497Z May 19 02:04:23 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-05-19T02:04:23.9208246Z May 19 02:04:23 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > 2022-05-19T02:04:23.9208887Z May 19 02:04:23 at > org.junit.runner.JUnitCore.run(JUnitCore.java:137) > 2022-05-19T02:04:23.9209691Z May 19 02:04:23 at > org.junit.runner.JUnitCore.run(JUnitCore.java:115) > 2022-05-19T02:04:23.9210490Z May 19 02:04:23 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43) > 2022-05-19T02:04:23.9211246Z May 19 02:04:23 at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > 2022-05-19T02:04:23.9211989Z May 19 02:04:23 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > 2022-05-19T02:04:23.9212682Z May 19 02:04:23 at > java.util.Iterator.forEachRemaining(Iterator.java:116) > 2022-05-19T02:04:23.9213391Z May 19 02:04:23 at > java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > 2022-05-19T02:04:23.9214305Z May 19 02:04:23 at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > 2022-05-19T02:04:23.9215044Z May 19 02:04:23 at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > 2022-05-19T02:04:23.9215809Z May 19 02:04:23 at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > 2022-05-19T02:04:23.9216576Z May 19 02:04:23 at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > 2022-05-19T02:04:23.9217523Z May 19 02:04:23 at >
[jira] [Updated] (FLINK-22741) Hide Flink complexity from Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-22741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-22741: --- Labels: auto-deprioritized-major stale-minor (was: auto-deprioritized-major) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Hide Flink complexity from Stateful Functions > - > > Key: FLINK-22741 > URL: https://issues.apache.org/jira/browse/FLINK-22741 > Project: Flink > Issue Type: New Feature > Components: Stateful Functions >Affects Versions: statefun-3.0.0 >Reporter: Stephan Ewen >Priority: Minor > Labels: auto-deprioritized-major, stale-minor > > This is an umbrella issue for various issues to hide and reduce the > complexity and surface area (and configuration space) of Apache Flink when > using Stateful Functions. > The goal of this is to create a setup and configuration that works robustly > in the vast majority of settings. Users should not be required to configure > anything Flink-specific, but only general parameters (like for example total > memory size) and StateFun specific parameters (like request timeouts and > batching). > If this happens at the cost of some minor regression in peak stream > throughput, we can most likely stomach that in StateFun, because the > performance cost is commonly dominated by the interaction between StateFun > cluster and remote function deployments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27775) FlinkKafkaProducer VS KafkaSink
[ https://issues.apache.org/jira/browse/FLINK-27775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27775: --- Labels: features stale-major (was: features) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 60 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > FlinkKafkaProducer VS KafkaSink > --- > > Key: FLINK-27775 > URL: https://issues.apache.org/jira/browse/FLINK-27775 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.14.3 >Reporter: Jiangfei Liu >Priority: Major > Labels: features, stale-major > Attachments: Snipaste_2022-05-25_19-52-11.png > > > sorry,my english is bad > in Flink1.14.3,write 1 data to kafka > when use FlinkKafkaProducer,comleted 7s > when use KafkaSink,comleted 1m40s > why KafkaSink is low speed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase.testSlidingTimeWindow failed with restore
[ https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-28440: --- Labels: stale-critical test-stability (was: test-stability) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Critical but is unassigned and neither itself nor its Sub-Tasks have been updated for 14 days. I have gone ahead and marked it "stale-critical". If this ticket is critical, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > EventTimeWindowCheckpointingITCase.testSlidingTimeWindow failed with restore > > > Key: FLINK-28440 > URL: https://issues.apache.org/jira/browse/FLINK-28440 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Critical > Labels: stale-critical, test-stability > > {code:java} > 2022-07-07T03:27:47.5779102Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2022-07-07T03:27:47.5779722Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2022-07-07T03:27:47.5780444Z at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > 2022-07-07T03:27:47.5781338Z at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > 2022-07-07T03:27:47.5781955Z at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > 2022-07-07T03:27:47.5782587Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2022-07-07T03:27:47.5783184Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2022-07-07T03:27:47.5783843Z at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268) > 2022-07-07T03:27:47.5784599Z at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2022-07-07T03:27:47.5785284Z at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2022-07-07T03:27:47.5785907Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2022-07-07T03:27:47.5786528Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2022-07-07T03:27:47.5787121Z at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277) > 2022-07-07T03:27:47.5787874Z at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) > 2022-07-07T03:27:47.5788498Z at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > 2022-07-07T03:27:47.5789265Z at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > 2022-07-07T03:27:47.5789968Z at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2022-07-07T03:27:47.5790582Z at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2022-07-07T03:27:47.5791198Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2022-07-07T03:27:47.5791799Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2022-07-07T03:27:47.5792351Z at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) > 2022-07-07T03:27:47.5793075Z at > akka.dispatch.OnComplete.internal(Future.scala:300) > 2022-07-07T03:27:47.5793572Z at > akka.dispatch.OnComplete.internal(Future.scala:297) > 2022-07-07T03:27:47.5794075Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) > 2022-07-07T03:27:47.5794586Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) > 2022-07-07T03:27:47.5795094Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2022-07-07T03:27:47.5795654Z at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) > 2022-07-07T03:27:47.5796307Z at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > 2022-07-07T03:27:47.5796922Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > 2022-07-07T03:27:47.5797574Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > 2022-07-07T03:27:47.5798196Z at >
[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart
[ https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571628#comment-17571628 ] David Anderson commented on FLINK-28060: [~martijnvisser] [~renqs] [~mason6345] Are we any closer to understanding this issue? Do we know how to fix it, either for a 1.15.2 release, or for 1.16.0? > Kafka Commit on checkpointing fails repeatedly after a broker restart > - > > Key: FLINK-28060 > URL: https://issues.apache.org/jira/browse/FLINK-28060 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.15.0 > Environment: Reproduced on MacOS and Linux. > Using java 8, Flink 1.15.0, Kafka 2.8.1. >Reporter: Christian Lorenz >Priority: Major > Labels: pull-request-available > Attachments: flink-kafka-testjob.zip > > > When Kafka Offset committing is enabled and done on Flinks checkpointing, an > error might occur if one Kafka broker is shutdown which might be the leader > of that partition in Kafkas internal __consumer_offsets topic. > This is an expected behaviour. But once the broker is started up again, the > next checkpoint issued by flink should commit the meanwhile processed offsets > back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 > anymore and the offset committing is broken. An warning like the following > will be logged on each checkpoint: > {code} > [info] 14:33:13.684 WARN [Source Data Fetcher for Source: input-kafka-source > -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - > Failed to commit consumer offsets for checkpoint 35 > [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: > Offset commit failed with a retriable exception. You should retry committing > the latest consumed offsets. > [info] Caused by: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available. > {code} > To reproduce this I've attached a small flink job program. To execute this > java8, scala sbt and docker / docker-compose is required. Also see readme.md > for more details. > The job can be run with `sbt run`, kafka cluster is started by > `docker-compose up`. If then the kafka brokers are restarted gracefully by > e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with > kafka2 and kafka3 afterwards, this warning will occur and no offsets will be > committed into kafka. > This is not reproducible in flink 1.14.4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-27692) Support local recovery for materialized part(write, restore, discard)
[ https://issues.apache.org/jira/browse/FLINK-27692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan closed FLINK-27692. - Assignee: Yanfei Lei Resolution: Fixed Merged into master as aef75be34d99c737f5c565703a971027ac44f855..52519a8eb695c9523c546439c66910b15f19be20 Thanks for the contribution [~Yanfei Lei]! > Support local recovery for materialized part(write, restore, discard) > - > > Key: FLINK-27692 > URL: https://issues.apache.org/jira/browse/FLINK-27692 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Support local recovery for materialized part(write, restore, discard) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan merged pull request #19907: [FLINK-27692][state] Support local recovery for materialized part of changelog
rkhachatryan merged PR #19907: URL: https://github.com/apache/flink/pull/19907 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dannycranmer commented on pull request #20360: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…
dannycranmer commented on PR #20360: URL: https://github.com/apache/flink/pull/20360#issuecomment-1195928597 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571487#comment-17571487 ] Hongbo edited comment on FLINK-28693 at 7/26/22 5:49 PM: - Swaping flink-table-planner-loader with flink-table-planer in the opt/ folder can solve the problem. Is there any drawback to the swap? (If not, why use flink-table-planner-loader as the default, as I saw others also reported problems about this lib). was (Author: liuhb86): Swapping replacing flink-table-planner-loader with flink-table-planer in the opt/ folder can solve the problem. Is there any drawback to the swap? (If not, why use flink-table-planner-loader as the default, as I saw others also reported problems about this lib). > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at >
[GitHub] [flink] rkhachatryan commented on a diff in pull request #20306: [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression
rkhachatryan commented on code in PR #20306: URL: https://github.com/apache/flink/pull/20306#discussion_r930161714 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java: ## @@ -168,7 +168,7 @@ private UploadTasksResult upload(Path path, Collection tasks) throws wrappedStreamClosed = true; } } finally { -if (!wrappedStreamClosed) { +if (!wrappedStreamClosed || compression) { fsStream.close(); } Review Comment: An alternative could be to remove ``` OutputStream compressed = compression ? instance.decorateWithCompression(fsStream) : fsStream; ``` Could you check if that causes the build to fail? ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java: ## @@ -168,7 +168,7 @@ private UploadTasksResult upload(Path path, Collection tasks) throws wrappedStreamClosed = true; } } finally { -if (!wrappedStreamClosed) { +if (!wrappedStreamClosed || compression) { fsStream.close(); } Review Comment: An alternative could be to remove ``` OutputStream compressed = compression ? instance.decorateWithCompression(fsStream) : fsStream; ``` Could you check if that causes the build to fail? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-26853) HeapStateBackend ignores metadata updates in certain cases
[ https://issues.apache.org/jira/browse/FLINK-26853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan closed FLINK-26853. - Fix Version/s: 1.16.0 Resolution: Fixed Merged into master as 8df50536ef913b63620d896423c39cdd01941c55. > HeapStateBackend ignores metadata updates in certain cases > -- > > Key: FLINK-26853 > URL: https://issues.apache.org/jira/browse/FLINK-26853 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.4, 1.15.0, 1.16.0 >Reporter: Roman Khachatryan >Assignee: Hangxiang Yu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > On recovery, HeapRestoreOperation reads state handles one by one; > * each handle contains metadata at the beginning; > * the metadata is always read, but not actually used if a state with the > corresponding name was already registered > In a rare case of downscaling + multiple checkpoints with different metadata; > this might lead to data being deserialized incorrectly (always using the > initial metadata). > It also prevents incremental checkpoints with schema evolution. > On first access, however, the backend itself will update (merge) metadata; so > that it doesn't affect new state updates. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan merged pull request #20268: [FLINK-26853][state] Update state serializer in StateMap when metaInfo changed
rkhachatryan merged PR #20268: URL: https://github.com/apache/flink/pull/20268 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #19907: [FLINK-27692][state] Support local recovery for materialized part of changelog
rkhachatryan commented on code in PR #19907: URL: https://github.com/apache/flink/pull/19907#discussion_r930115553 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -407,8 +408,15 @@ public RunnableFuture> snapshot( metrics.reportSnapshotResult(snapshotResult)) .thenApply( snapshotResult -> -SnapshotResult.of( - snapshotResult.getJobManagerOwnedSnapshot(; +snapshotResult.getTaskLocalSnapshot() == null +|| snapshotResult + .getJobManagerOwnedSnapshot() +== null +? SnapshotResult.of( + snapshotResult.getJobManagerOwnedSnapshot()) +: SnapshotResult.withLocalState( + snapshotResult.getJobManagerOwnedSnapshot(), + snapshotResult.getTaskLocalSnapshot(; Review Comment: nit: there is no need to re-create the result object, and more importantly this is less readable than just casting: ``` .thenApply(this::castSnapshotResult)); ... @SuppressWarnings("unchecked") private SnapshotResult castSnapshotResult(SnapshotResult snapshotResult) { return (SnapshotResult) snapshotResult; } ``` ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -407,8 +408,15 @@ public RunnableFuture> snapshot( metrics.reportSnapshotResult(snapshotResult)) .thenApply( snapshotResult -> -SnapshotResult.of( - snapshotResult.getJobManagerOwnedSnapshot(; +snapshotResult.getTaskLocalSnapshot() == null +|| snapshotResult + .getJobManagerOwnedSnapshot() +== null +? SnapshotResult.of( + snapshotResult.getJobManagerOwnedSnapshot()) +: SnapshotResult.withLocalState( + snapshotResult.getJobManagerOwnedSnapshot(), + snapshotResult.getTaskLocalSnapshot(; Review Comment: nit: there is no need to re-create the result object, and more importantly this is less readable than just casting: ``` .thenApply(this::castSnapshotResult)); ... @SuppressWarnings("unchecked") private SnapshotResult castSnapshotResult(SnapshotResult snapshotResult) { return (SnapshotResult) snapshotResult; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #20355: [FLINK-28634][json] Add simple JsonSerDeSchema
zentol commented on PR #20355: URL: https://github.com/apache/flink/pull/20355#issuecomment-1195637892 > It's very common for deserializers to include an implementation of getProducedType. We extend the `AbstractDeserializationSchema` which takes care of that based on the class/typeinfo passed to the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20369: [FLINK-28697][table] MapDataSerializer doesn't declare a serialVersionUID
flinkbot commented on PR #20369: URL: https://github.com/apache/flink/pull/20369#issuecomment-1195631087 ## CI report: * 410ffd4ee14e36e442595526a503e214bd03ba79 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID
[ https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28697: --- Labels: pull-request-available (was: ) > MapDataSerializer doesn't declare a serialVersionUID > > > Key: FLINK-28697 > URL: https://issues.apache.org/jira/browse/FLINK-28697 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.15.1 >Reporter: Josh Mahonin >Priority: Major > Labels: pull-request-available > Fix For: 1.15.1 > > > MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a > InvalidClassException when attempting to serialize with different JREs for > compilation / runtime. > {code:java} > Caused by: java.io.InvalidClassException: > org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class > incompatible: stream classdesc serialVersionUID = 2533002123505507000, local > class serialVersionUID = 1622156938509929811 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] jmahonin opened a new pull request, #20369: [FLINK-28697][table] MapDataSerializer doesn't declare a serialVersionUID
jmahonin opened a new pull request, #20369: URL: https://github.com/apache/flink/pull/20369 ## What is the purpose of the change *MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a InvalidClassException when attempting to serialize with different JREs for compilation / runtime. ## Brief change log - * Set it to serialVersionUID to 1L as with the other TypeSerializers* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. - *Verified manually that this solves the serialization mismatch* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: don't know - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] alpinegizmo commented on a diff in pull request #20355: [FLINK-28634][json] Add simple JsonSerDeSchema
alpinegizmo commented on code in PR #20355: URL: https://github.com/apache/flink/pull/20355#discussion_r930088762 ## docs/content.zh/docs/connectors/datastream/formats/json.md: ## @@ -0,0 +1,77 @@ +--- +title: "JSON" +weight: 4 +type: docs +--- + + +# Json format + +To use the JSON format you need to add the Flink JSON dependency to your project: + +```xml + + org.apache.flink + flink-json + {{< version >}} + provided + +``` + +Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`. +These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`. + +The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`. + +For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`: + +```java +JsonDeserializationSchema jsonFormat = new JsonDeserializationSchema<>(SomePojo.class); +KafkaSource source = +KafkaSource.builder() +.setValueOnlyDeserializer(jsonFormat) +... +``` + +The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`. + +For example, this is how you use it with a `KafkaSink` to serialize a `POJO`: + +```java +JsonSerializationSchema jsonFormat = new JsonSerializationSchema<>(); +KafkaSink source = +KafkaSink.builder() +.setRecordSerializer( +new KafkaRecordSerializationSchemaBuilder<>() +.setValueSerializationSchema(jsonFormat) +... +``` + +## Custom Mapper + +Both schemas have constructors that accept a `SerializableSupplier`, acting a factory for object mappers. Review Comment: ```suggestion Both schemas have constructors that accept a `SerializableSupplier`, acting as a factory for object mappers. ``` ## docs/content/docs/connectors/datastream/formats/json.md: ## @@ -0,0 +1,77 @@ +--- +title: "JSON" +weight: 4 +type: docs +--- + + +# Json format + +To use the JSON format you need to add the Flink JSON dependency to your project: + +```xml + + org.apache.flink + flink-json + {{< version >}} + provided + +``` + +Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`. +These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`. + +The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`. + +For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`: + +```java +JsonDeserializationSchema jsonFormat=new JsonDeserializationSchema<>(SomePojo.class); +KafkaSource source= +KafkaSource.builder() +.setValueOnlyDeserializer(jsonFormat) +... +``` + +The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`. + +For example, this is how you use it with a `KafkaSink` to serialize a `POJO`: + +```java +JsonSerializationSchema jsonFormat=new JsonSerializationSchema<>(); +KafkaSink source = +KafkaSink.builder() +.setRecordSerializer( +new KafkaRecordSerializationSchemaBuilder<>() +.setValueSerializationSchema(jsonFormat) +... +``` + +## Custom Mapper + +Both schemas have constructors that accept a `SerializableSupplier`, acting a factory for object mappers. +With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality. Review Comment: ```suggestion With this factory you gain full control over the created mapper, and you can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality. ``` ## docs/content.zh/docs/connectors/datastream/formats/json.md: ## @@ -0,0 +1,77 @@ +--- +title: "JSON" +weight: 4 +type: docs +--- + + +# Json format + +To use the JSON format you need to add the Flink JSON dependency to your project: + +```xml + + org.apache.flink + flink-json + {{< version >}} + provided + +``` + +Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`. +These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`. + +The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`. + +For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`: + +```java +JsonDeserializationSchema jsonFormat = new
[GitHub] [flink] flinkbot commented on pull request #20368: [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be notified when there are no newly added nodes (only merge nodes)
flinkbot commented on PR #20368: URL: https://github.com/apache/flink/pull/20368#issuecomment-1195615756 ## CI report: * ec687cbbb821644dc86a3642d62500e5158d8970 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID
Josh Mahonin created FLINK-28697: Summary: MapDataSerializer doesn't declare a serialVersionUID Key: FLINK-28697 URL: https://issues.apache.org/jira/browse/FLINK-28697 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.15.1 Reporter: Josh Mahonin Fix For: 1.15.1 MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a InvalidClassException when attempting to serialize with different JREs for compilation / runtime. {code:java} Caused by: java.io.InvalidClassException: org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class incompatible: stream classdesc serialVersionUID = 2533002123505507000, local class serialVersionUID = 1622156938509929811 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener
[ https://issues.apache.org/jira/browse/FLINK-28696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28696: --- Labels: pull-request-available (was: ) > Notify all newlyAdded/Merged blocked nodes to BlocklistListener > --- > > Key: FLINK-28696 > URL: https://issues.apache.org/jira/browse/FLINK-28696 > Project: Flink > Issue Type: Sub-task >Reporter: Lijie Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > This bug was introduced by FLINK-28660. Our newly added logic results in that > blocklist listener will not be notified when there are no newly added nodes > (only merge nodes) 。 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571487#comment-17571487 ] Hongbo commented on FLINK-28693: Swapping replacing flink-table-planner-loader with flink-table-planer in the opt/ folder can solve the problem. Is there any drawback to the swap? (If not, why use flink-table-planner-loader as the default, as I saw others also reported problems about this lib). > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1] > at >
[GitHub] [flink] wanglijie95 opened a new pull request, #20368: [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be notified when there are no newly added nodes (only merge nod
wanglijie95 opened a new pull request, #20368: URL: https://github.com/apache/flink/pull/20368 ## What is the purpose of the change [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be notified when there are no newly added nodes (only merge nodes) ## Verifying this change `DefaultBlocklistHandlerTest#testAddNewBlockedNodes` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #308: [FLINK-28223] Add artifact-fetcher to the pod-template.yaml example
morhidi commented on code in PR #308: URL: https://github.com/apache/flink-kubernetes-operator/pull/308#discussion_r930060228 ## examples/pod-template-artifact-fetcher.yaml: ## @@ -0,0 +1,84 @@ + Review Comment: Pls remove this file, this is just a duplicate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28592) Custom resource counter metrics improvements
[ https://issues.apache.org/jira/browse/FLINK-28592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28592: --- Labels: pull-request-available (was: ) > Custom resource counter metrics improvements > > > Key: FLINK-28592 > URL: https://issues.apache.org/jira/browse/FLINK-28592 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Matyas Orhidi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > * add counters at global level > * add option to turn on/off the metrics -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] morhidi opened a new pull request, #331: [FLINK-28592] Custom resource counter metrics improvements
morhidi opened a new pull request, #331: URL: https://github.com/apache/flink-kubernetes-operator/pull/331 ## What is the purpose of the change - add option to turn on/off the base custom resource metrics - some additional refactor around the `MetricManager` ## Brief change log - added `kubernetes.operator.resource.metrics.enabled` to turn on/off the metrics - added `TestingMetricListener` utility class - refactored the `MetricManager` to use registered `CustomResourceMetrics` interfaces - fixed generic types on `StatusRecorder` ## Verifying this change - covered by improved `FlinkDeploymentMetricsTest`, `FlinkSessionJobMetricsTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation docs added for `kubernetes.operator.resource.metrics.enabled` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28680) No space left on device on Azure e2e_2_ci tests
[ https://issues.apache.org/jira/browse/FLINK-28680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571471#comment-17571471 ] Robert Metzger commented on FLINK-28680: Sorry, I wasn't sure whether 1.14 is still maintained. merged to release-1.14 in 274040dafad472cfe2a88e1e3038258545f1a2ef > No space left on device on Azure e2e_2_ci tests > --- > > Key: FLINK-28680 > URL: https://issues.apache.org/jira/browse/FLINK-28680 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.14.5, 1.16.0 >Reporter: Huang Xingbo >Assignee: Robert Metzger >Priority: Blocker > Labels: pull-request-available, test-stability > > Many e2e tests failed due to no enough space. We previously cleaned up the > space by cleaning up the flink-e2e directory, but at the moment this is not > enough to solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] maosuhan commented on a diff in pull request #14376: [FLINK-18202][PB format] New Format of protobuf
maosuhan commented on code in PR #14376: URL: https://github.com/apache/flink/pull/14376#discussion_r930035338 ## flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE: ## @@ -0,0 +1,9 @@ +flink-sql-protobuf +Copyright 2014-2021 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) Review Comment: @libenchao thanks for your comment. I have fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20367: [FLINK-19589][Filesystem][S3] Support per-connector FileSystem configuration
flinkbot commented on PR #20367: URL: https://github.com/apache/flink/pull/20367#issuecomment-1195554304 ## CI report: * 8c636d1ba0cb8564474bcaaec4e8996111bdb7cd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26902) Introduce performence tune and benchmark document for table store
[ https://issues.apache.org/jira/browse/FLINK-26902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571460#comment-17571460 ] ConradJam commented on FLINK-26902: --- Hi [~lzljs3620320] I want to tick this ticket to my first commit to table store :), Where should I start referring to this part of the performence tune and benchmark information? > Introduce performence tune and benchmark document for table store > - > > Key: FLINK-26902 > URL: https://issues.apache.org/jira/browse/FLINK-26902 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Priority: Minor > Fix For: table-store-0.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration
[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571456#comment-17571456 ] Josh Mahonin commented on FLINK-19589: -- PR is open. Note that this only address dynamic configuration for the S3 connector. I'm happy to retarget to another ticket if that's more appropriate, or if there are suggestions to generalize this more broadly, that's fine too. > Support per-connector FileSystem configuration > -- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.12.0 >Reporter: Padarn Wilson >Assignee: Josh Mahonin >Priority: Major > Labels: pull-request-available > Attachments: FLINK-19589.patch > > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-19589) Support per-connector FileSystem configuration
[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19589: --- Labels: pull-request-available (was: ) > Support per-connector FileSystem configuration > -- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.12.0 >Reporter: Padarn Wilson >Assignee: Josh Mahonin >Priority: Major > Labels: pull-request-available > Attachments: FLINK-19589.patch > > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] jmahonin opened a new pull request, #20367: [FLINK-19589][Filesystem][S3] Support per-connector FileSystem configuration
jmahonin opened a new pull request, #20367: URL: https://github.com/apache/flink/pull/20367 ## What is the purpose of the change Introduces dynamic filesystem configuration for S3 ## Brief change log - *Add URI query parameter parsing and apply `fs.s3a.` entries to Hadoop Configuration for S3/S3A* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing This change added tests and can be verified as follows: - *Added unit test that verifies URI parameters are parsed and applied to Hadoop configuration* - *A version of this is running in production for an S3-backed sink using an AssumedRoleProvider and dynamic role ARN* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: yes ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test
taosiyuan163 commented on code in PR #132: URL: https://github.com/apache/flink-ml/pull/132#discussion_r930018268 ## flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java: ## @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.stats.chisqtest; + +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.DataStreamUtils; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.math3.distribution.ChiSquaredDistribution; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Chi-square test of independence of variables in a contingency table. This Transformer computes + * the chi-square statistic,p-value,and dof(number of degrees of freedom) for every feature in the + * contingency table, which constructed from the `observed` for each categorical values. All label + * and feature values must be categorical. + * + * See: http://en.wikipedia.org/wiki/Chi-squared_test. + */ +public class ChiSqTest implements AlgoOperator, ChiSqTestParams { + +final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey"; +final String bcLabelMarginsKey = "bcLabelMarginsKey"; + +private final Map, Object> paramMap = new HashMap<>(); + +public ChiSqTest() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public Table[] transform(Table... inputs) { + +final String[] inputCols = getInputCols(); +String labelCol = getLabelCol(); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); + +SingleOutputStreamOperator> colAndFeatureAndLabel = +tEnv.toDataStream(inputs[0]) +.flatMap(new ExtractColAndFeatureAndLabel(inputCols, labelCol)); + +// compute the observed frequencies +DataStream> observedFreq = +DataStreamUtils.mapPartition( +colAndFeatureAndLabel.keyBy(Tuple3::hashCode), +new GenerateObservedFrequencies()); + +SingleOutputStreamOperator> filledObservedFreq = Review Comment: > How about we just compute the `distinct labels` and postpone the `fill` operation to Line#199, e.g., `DataStream> categoricalStatistics =...`? > > Using `parallellism=1` for computing all data is not efficient usually.
[jira] [Updated] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener
[ https://issues.apache.org/jira/browse/FLINK-28696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang updated FLINK-28696: --- Description: This bug was introduced by FLINK-28660. Our newly added logic results in that blocklist listener will not be notified when there are no newly added nodes (only merge nodes) 。 > Notify all newlyAdded/Merged blocked nodes to BlocklistListener > --- > > Key: FLINK-28696 > URL: https://issues.apache.org/jira/browse/FLINK-28696 > Project: Flink > Issue Type: Sub-task >Reporter: Lijie Wang >Priority: Major > Fix For: 1.16.0 > > > This bug was introduced by FLINK-28660. Our newly added logic results in that > blocklist listener will not be notified when there are no newly added nodes > (only merge nodes) 。 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener
Lijie Wang created FLINK-28696: -- Summary: Notify all newlyAdded/Merged blocked nodes to BlocklistListener Key: FLINK-28696 URL: https://issues.apache.org/jira/browse/FLINK-28696 Project: Flink Issue Type: Sub-task Reporter: Lijie Wang Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27536) Rename method parameter in AsyncSinkWriter
[ https://issues.apache.org/jira/browse/FLINK-27536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571447#comment-17571447 ] DavidLiu commented on FLINK-27536: -- [~dannycranmer] Please help review this Pull Request again,which has been updated. [https://github.com/apache/flink/pull/20360] > Rename method parameter in AsyncSinkWriter > -- > > Key: FLINK-27536 > URL: https://issues.apache.org/jira/browse/FLINK-27536 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Assignee: DavidLiu >Priority: Minor > Labels: pull-request-available > > Change the abstract method's parameter naming in AsyncSinkWriter. > From > Consumer> requestResult > to > Consumer> requestToRetry > or something similar. > > This is because the consumer here is supposed to accept a list of requests > that need to be retried. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-27536) Rename method parameter in AsyncSinkWriter
[ https://issues.apache.org/jira/browse/FLINK-27536 ] DavidLiu deleted comment on FLINK-27536: -- was (Author: JIRAUSER289843): [~dannycranmer] Please help review this Pull Request again,which has been updated. [https://github.com/apache/flink/pull/20360] > Rename method parameter in AsyncSinkWriter > -- > > Key: FLINK-27536 > URL: https://issues.apache.org/jira/browse/FLINK-27536 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Assignee: DavidLiu >Priority: Minor > Labels: pull-request-available > > Change the abstract method's parameter naming in AsyncSinkWriter. > From > Consumer> requestResult > to > Consumer> requestToRetry > or something similar. > > This is because the consumer here is supposed to accept a list of requests > that need to be retried. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28695) Fail to send partition request to restarted taskmanager
Simonas created FLINK-28695: --- Summary: Fail to send partition request to restarted taskmanager Key: FLINK-28695 URL: https://issues.apache.org/jira/browse/FLINK-28695 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes, Runtime / Network Affects Versions: 1.15.1, 1.15.0 Reporter: Simonas Attachments: deployment.txt, job_log.txt, jobmanager_config.txt, jobmanager_logs.txt, pod_restart.txt, taskmanager_config.txt After upgrade to *1.15.1* we started getting error while deploying JOB {code:java} org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed.at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145) {code} {code:java} Caused by: org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException atrg.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source){code} After investigation we managed narrow it down to the exact behavior then this issue happens: # Deploying JOB on fresh kubernetes session cluster with multiple TaskManagers: TM1 and TM2 is successful. Job has multiple partitions running on both TM1 and TM2. # One TaskManager TM2 (XXX.XXX.XX.32) fails for unrelated issue. For example OOM exception. # Kubernetes POD with mentioned TaskManager TM2 is restarted. POD retains same IP address as before. # JobManager is able to pickup the restarted TM2 (XXX.XXX.XX.32) # JOB is restarted because it was running on the failed TaskManager TM2 # TM1 data channel to TM2 is closed and we get LocalTransportException: Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed during JOB running stage. # When we explicitly delete pod with TM2 it creates new POD with different IP address and JOB is able to start again. Important to note that we didn't encountered this issue with previous *1.14.4* version and TaskManager restarts didn't cause such error. Please note attached kubernetes deployments and reduced logs from JobManager. TaskManager logs did show errors before error, but doesn't show anything significant after restart. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongbo updated FLINK-28693: --- Description: The following code will throw an exception: {code:java} Table program cannot be compiled. This is a bug. Please file an issue. ... Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 54: Cannot determine simple type name "org" {code} {color:#00}Code:{color} {code:java} public class TestUdf extends ScalarFunction { @DataTypeHint("TIMESTAMP(3)") public LocalDateTime eval(String strDate) { return LocalDateTime.now(); } } public class FlinkTest { @Test void testUdf() throws Exception { //var env = StreamExecutionEnvironment.createLocalEnvironment(); // run `gradlew shadowJar` first to generate the uber jar. // It contains the kafka connector and a dummy UDF function. var env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "build/libs/flink-test-all.jar"); env.setParallelism(1); var tableEnv = StreamTableEnvironment.create(env); tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("time_stamp", DataTypes.STRING()) .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") .watermark("udf_ts", "udf_ts - INTERVAL '1' second") .build()) // the kafka server doesn't need to exist. It fails in the compile stage before fetching data. .option("properties.bootstrap.servers", "localhost:9092") .option("topic", "test_topic") .option("format", "json") .option("scan.startup.mode", "latest-offset") .build()); testTable.printSchema(); tableEnv.createTemporaryView("test", testTable ); var query = tableEnv.sqlQuery("select * from test"); var tableResult = query.executeInsert(TableDescriptor.forConnector("print").build()); tableResult.await(); } }{code} What does the code do? # read a stream from Kakfa # create a derived column using an UDF expression # define the watermark based on the derived column The full callstack: {code:java} org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) ~[flink-table-runtime-1.15.1.jar:1.15.1] at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) ~[flink-table-runtime-1.15.1.jar:1.15.1] at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.1.jar:1.15.1] at org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) ~[flink-table-runtime-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) [flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) [flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side
lsyldliu commented on code in PR #20361: URL: https://github.com/apache/flink/pull/20361#discussion_r929988353 ## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/AddRemoteJarITCase.java: ## @@ -0,0 +1,110 @@ +package org.apache.flink.table.client.gateway.context; + +import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.UserClassLoaderJarTestUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM; +import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; +import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS; +import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for adding remote jar. */ +public class AddRemoteJarITCase { +@ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + Review Comment: We should not introduce this test, the local jar tests has been covered by existing test. Regarding to remote jar test, it should be introduced in e2e module. ## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java: ## @@ -156,13 +156,7 @@ public void testAddJarWithRelativePath() throws IOException { @Test public void testAddIllegalJar() { -validateAddJarWithException("/path/to/illegal.jar", "JAR file does not exist"); -} - -@Test -public void testAddRemoteJar() { -validateAddJarWithException( Review Comment: This test should not be removed. ## flink-table/flink-sql-client/pom.xml: ## @@ -515,6 +522,26 @@ under the License. provided + + org.apache.hadoop + hadoop-hdfs + test + + + + org.apache.hadoop + hadoop-hdfs Review Comment: I think introducing hdfs dependency is a little heavy, we should test remote jar in e2e module, so I think these related test should be placed in flink-end-to-end-tests-sql module. Regarding to how to use hdfs cluster in e2e test, you can refer to the flink-end-to-end-tests-hbase module. ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) { return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true); } +@Override +public void addJar(String jarPath) { Review Comment: Please add UT about `ADD Jar` and `SHOW JARS` in `TableEnvironmentITCase`, add IT case about `ADD Jar` in `TableEnvironmentITCase` which you can refer to the related tests in `FunctionITCase`. ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -575,6 +576,20 @@ void createFunction( */ boolean dropTemporaryFunction(String path); +/** + * Add jar to the classLoader for use. + * + * @param jarPath The jar path to be added. + */ +void addJar(String jarPath); Review Comment: We should not introducing these two method, they are public api, we cannot introduce public api arbitrarily, it should be discussed in community. We should reuse the `AddJarOperation` and `ShowJarsOperation`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571444#comment-17571444 ] Hongbo commented on FLINK-28693: Is it caused by change in the codegen logic or a classloader problem because of the new module `{{{}flink-table-planner-loader{}}}`? > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue.at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1]at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1]at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1]at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) >
[jira] [Updated] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongbo updated FLINK-28693: --- Description: The following code will throw an exception: {code:java} Table program cannot be compiled. This is a bug. Please file an issue. ... Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 54: Cannot determine simple type name "org" {code} {color:#00}Code:{color} {code:java} public class TestUdf extends ScalarFunction { @DataTypeHint("TIMESTAMP(3)") public LocalDateTime eval(String strDate) { return LocalDateTime.now(); } } public class FlinkTest { @Test void testUdf() throws Exception { //var env = StreamExecutionEnvironment.createLocalEnvironment(); // run `gradlew shadowJar` first to generate the uber jar. // It contains the kafka connector and a dummy UDF function. var env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "build/libs/flink-test-all.jar"); env.setParallelism(1); var tableEnv = StreamTableEnvironment.create(env); tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("time_stamp", DataTypes.STRING()) .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") .watermark("udf_ts", "udf_ts - INTERVAL '1' second") .build()) // the kafka server doesn't need to exist. It fails in the compile stage before fetching data. .option("properties.bootstrap.servers", "localhost:9092") .option("topic", "test_topic") .option("format", "json") .option("scan.startup.mode", "latest-offset") .build()); testTable.printSchema(); tableEnv.createTemporaryView("test", testTable ); var query = tableEnv.sqlQuery("select * from test"); var tableResult = query.executeInsert(TableDescriptor.forConnector("print").build()); tableResult.await(); } }{code} What does the code do? # read a stream from Kakfa # create a derived column using an UDF expression # define the watermark based on the derived column The full callstack: {code:java} org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) ~[flink-table-runtime-1.15.1.jar:1.15.1]at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) ~[flink-table-runtime-1.15.1.jar:1.15.1]at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.1.jar:1.15.1]at org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) ~[flink-table-runtime-1.15.1.jar:1.15.1]at org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) [flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) [flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
[jira] [Comment Edited] (FLINK-27536) Rename method parameter in AsyncSinkWriter
[ https://issues.apache.org/jira/browse/FLINK-27536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571196#comment-17571196 ] DavidLiu edited comment on FLINK-27536 at 7/26/22 1:54 PM: --- [~dannycranmer] Please help review this Pull Request again,which has been updated. [https://github.com/apache/flink/pull/20360] was (Author: JIRAUSER289843): [~dannycranmer] The old PR was closed, please help review the new Pull Request [https://github.com/apache/flink/pull/20360] The build failure was fixed and verified locally. > Rename method parameter in AsyncSinkWriter > -- > > Key: FLINK-27536 > URL: https://issues.apache.org/jira/browse/FLINK-27536 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Assignee: DavidLiu >Priority: Minor > Labels: pull-request-available > > Change the abstract method's parameter naming in AsyncSinkWriter. > From > Consumer> requestResult > to > Consumer> requestToRetry > or something similar. > > This is because the consumer here is supposed to accept a list of requests > that need to be retried. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28694) Set pipeline.name to resource name by default for application deployments
[ https://issues.apache.org/jira/browse/FLINK-28694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-28694: --- Summary: Set pipeline.name to resource name by default for application deployments (was: Set pipeline.name to resource name by default for Applications) > Set pipeline.name to resource name by default for application deployments > - > > Key: FLINK-28694 > URL: https://issues.apache.org/jira/browse/FLINK-28694 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Minor > > I think it would be nice to set pipeline.name by default to the resource name > for application deployments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28694) Set pipeline.name to resource name by default for application deployments
[ https://issues.apache.org/jira/browse/FLINK-28694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571443#comment-17571443 ] Gyula Fora commented on FLINK-28694: [~wangyang0918] do you see any downsides? > Set pipeline.name to resource name by default for application deployments > - > > Key: FLINK-28694 > URL: https://issues.apache.org/jira/browse/FLINK-28694 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Minor > > I think it would be nice to set pipeline.name by default to the resource name > for application deployments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongbo updated FLINK-28693: --- Description: The following code will throw an exception: {code:java} Table program cannot be compiled. This is a bug. Please file an issue. ... Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 54: Cannot determine simple type name "org" {code} {color:#00}Code:{color} {code:java} public class TestUdf extends ScalarFunction { @DataTypeHint("TIMESTAMP(3)") public LocalDateTime eval(String strDate) { return LocalDateTime.now(); } } public class FlinkTest { @Test void testUdf() throws Exception { //var env = StreamExecutionEnvironment.createLocalEnvironment(); // run `gradlew shadowJar` first to generate the uber jar. // It contains the kafka connector and a dummy UDF function. var env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "build/libs/flink-test-all.jar"); env.setParallelism(1); var tableEnv = StreamTableEnvironment.create(env); tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("time_stamp", DataTypes.STRING()) .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") .watermark("udf_ts", "udf_ts - INTERVAL '1' second") .build()) // the kafka server doesn't need to exist. It fails in the compile stage before fetching data. .option("properties.bootstrap.servers", "localhost:9092") .option("topic", "test_topic") .option("format", "json") .option("scan.startup.mode", "latest-offset") .build()); testTable.printSchema(); tableEnv.createTemporaryView("test", testTable ); var query = tableEnv.sqlQuery("select * from test"); var tableResult = query.executeInsert(TableDescriptor.forConnector("print").build()); tableResult.await(); } }{code} What does the code do?{color:#cc7832} {color} # read a stream from Kakfa # create a derived column using an UDF expression # define the watermark based on the derived column The full callstack: {code:java} org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) ~[flink-table-runtime-1.15.1.jar:1.15.1]at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) ~[flink-table-runtime-1.15.1.jar:1.15.1]at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.1.jar:1.15.1]at org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) ~[flink-table-runtime-1.15.1.jar:1.15.1]at org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) [flink-dist-1.15.1.jar:1.15.1]at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) [flink-dist-1.15.1.jar:1.15.1]at