[jira] [Created] (FLINK-28700) Table store sink fails to commit for Flink 1.14 batch job

2022-07-26 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-28700:
---

 Summary: Table store sink fails to commit for Flink 1.14 batch job
 Key: FLINK-28700
 URL: https://issues.apache.org/jira/browse/FLINK-28700
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Affects Versions: table-store-0.2.0, table-store-0.3.0
Reporter: Caizhi Weng
 Fix For: table-store-0.2.0, table-store-0.3.0


We can't get configurations from DummyStreamExecutionEnvironment in Flink 1.14 
(this is fixed by FLINK-26709 in Flink 1.15) so we have to use Java reflection 
to check if this execution environment is for batch job or for streaming job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

2022-07-26 Thread GitBox


fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r930644202


##
flink-connectors/flink-connector-hive/src/test/resources/endpoint/hive_catalog.q:
##
@@ -0,0 +1,121 @@
+# catalog_database.q - CREATE/DROP/SHOW/USE CATALOG/DATABASE
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ==
+# test hive catalog
+# ==
+
+show current catalog;
+!output
++--+
+| current catalog name |
++--+
+| hive |
++--+
+1 row in set
+!ok
+
+show databases;
+!output
++---+
+| database name |
++---+
+|   default |
++---+
+1 row in set
+!ok
+
+show tables;
+!output
+Empty set
+!ok
+
+create database additional_test_database;
+!output
+++
+| result |
+++
+| OK |
+++
+1 row in set
+!ok
+
+use additional_test_database;
+!output
+++
+| result |
+++
+| OK |
+++
+1 row in set
+!ok
+
+create table param_types_table (
+dec DECIMAL(10, 10),
+ch CHAR(5),
+vch VARCHAR(15)
+);
+!output
+++
+| result |
+++
+| OK |
+++
+1 row in set
+!ok
+
+show tables;
+!output
++---+
+|table name |
++---+
+| param_types_table |
++---+
+1 row in set
+!ok
+
+show current database;
+!output
++--+
+|current database name |
++--+
+| additional_test_database |
++--+
+1 row in set
+!ok
+
+# ==
+# test hive table with parameterized types
+# ==
+
+describe hive.additional_test_database.param_types_table;
+!output
++--+-+--+-++---+
+| name |type | null | key | extras | watermark |

Review Comment:
   Yes. Maybe we should introduce a hive-style describe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

2022-07-26 Thread GitBox


xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1196291153

   
   @tweise Thank you so much for reviewing the PR! I just realized that I might 
have misread jira issue as `HybridSource: Support dynamic stop position in 
HybridSource` instead of `HybridSource: Support dynamic stop position in 
FileSource`. So this PR actually aimed to design an _**generic interface to 
allow any sources to participate in dynamic source switch**_. With that in 
mind. Let me explain how I came up with the current design & implementation.
   
   After reviewing the current logic of the hybrid source (amazing work   
!!), I understand that the current implementation support transferring the end 
position to the next enumerator. However, it lacks the mechanism to know where 
is the end position (i.e. offset for a kafka partition). And these "end 
positions" are probably unknown beforehand, or it would be the same as [fixed 
start 
position](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#fixed-start-position-at-graph-construction-time)?
   Therefore, I think the key is to transfer the "end position" to the 
enumerator from source reader during source switch. 
   There are a few points to consider:
   
   1. What would be the "end position" to transfer to the next source? 
   
   I believe this varied by use cases. Some may find it enough to use 
`file.path`, while some may require to derive the timestamp or offset from the 
content of the file (i.e. kafka archive, and the implementation can vary by 
companies.). Since we can't anticipate all use cases, passing all finished 
splits seem to be a reasonable solution here and let the developer to decide 
how to derive the position from them.
   
   2. Where to make the changes?
   
   I aimed to implement this s.t. most existing sources can benefit from it out 
of the box with minimal changes and no breaking changes to them.
   
   
   3. How to store the "finished splits" before source switch?
   
   Per FLIP-27, the enumerator only knows what splits to consume but not the 
actual progress - only source reader knows about it. So we need to store them 
and transfer them to the enumerator during source switch. However, most 
existing sources implements `SourceReaderBase` and it [purges them from 
state](https://github.com/apache/flink/blob/3e2620bc785b3b4d82f1f188eb7b1e0e129b14d3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L193-L194)
 once it reaches the end position. One naive solution would be to adjust 
`SourceReaderBase` to also store finished splits into the state. However, 
this'll affect all sources running in production and is probably a big backward 
incompatible changes right away. Therefore, I decided to store them only in the 
`HybridSourceReader`, and the existing sources only need to implement one 
method (`DynamicHybridSourceReader::getFinishedSplits`) that allows the 
finished splits to be extracted
  during checkpoint and source switch. This process is transparent to all 
existing sources, and only happens when used with the `HybridSource`.
   
   With the above points, the current implementation works as follow:
   - On each checkpoint,  `HybridSourceReader`  retrieve the finished states 
(marked with  `HybridSourceSplit.isFinished = true`) from the underlying reader 
and checkpoint them for persistent along with the unfinished states.
   - Upon source switch,  `HybridSourceReader`  will send all the finished 
splits in  `SourceReaderFinishedEvent`  to the enumerator.
   - Enumerator will pass along those finished splits to in  
`SourceSwitchContext`  to the next source. And the next source can them use the 
splits to derive the start positions.
   
    Changes required on existing non hybrid sources:
   - Implements `DynamicHybridSourceReader::getFinishedSplits` on 
`SourceReaderBase`.
   
    API changes on `HybridSource`
   - Added `SourceSwitchContext::getPreviousSplits` which returns finished 
splits from the previous source.
   
   It's a lot of words, so really appreciate your patience for reading this. 
Let me know if there are anything unclear, I'm happy to chat more about this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener

2022-07-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-28696.
---
Resolution: Fixed

Fixed via bbaeb628f48a4bc4c324bfc4afd06ddf34f546f0

> Notify all newlyAdded/Merged blocked nodes to BlocklistListener
> ---
>
> Key: FLINK-28696
> URL: https://issues.apache.org/jira/browse/FLINK-28696
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> This bug was introduced by FLINK-28660. Our newly added logic results in that 
> blocklist listener will not be notified when there are no newly added nodes 
> (only merge nodes) 。
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zhuzhurk closed pull request #20368: [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be notified when there are no newly added nodes (only merge nodes)

2022-07-26 Thread GitBox


zhuzhurk closed pull request #20368: [FLINK-28696][runtime] Fix the bug that 
the blocklist listeners will not be notified when there are no newly added 
nodes (only merge nodes)
URL: https://github.com/apache/flink/pull/20368


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

2022-07-26 Thread GitBox


lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r930593185


##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##
@@ -308,6 +337,24 @@ private void checkpointCoordinatorInternal(
 }
 }
 
+private boolean closeGateways(
+final long checkpointId, final Set subtasksToCheckpoint) {
+boolean hasCloseableGateway = false;
+for (int subtask : subtasksToCheckpoint) {
+SubtaskGatewayImpl gateway = subtaskGatewayMap.get(subtask);
+if (!gateway.tryCloseGateway(checkpointId)) {

Review Comment:
   If we do want to capture that bug, how about we consistently call 
`tryCloseGateway()` for every subtasks and check the condition at the end of 
the loop, instead of stopping at the first subtask whose `tryCloseGateway()` 
return false?
   
   Otherwise, suppose `subtask_1.tryCloseGateway() == false` and 
`subtask_2.tryCloseGateway() == true`, the method will not throw Exception. 
This makes the method's behavior harder to understand.
   
   
   



##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/AcknowledgeCheckpointEvent.java:
##
@@ -18,20 +18,16 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
-import org.apache.flink.runtime.messages.Acknowledge;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
+/**
+ * An {@link OperatorEvent} sent from a subtask to its {@link 
OperatorCoordinator} to signal that
+ * the checkpoint of an individual task is completed.
+ */
+public class AcknowledgeCheckpointEvent implements OperatorEvent {
 
-/** Simple interface for a component that takes and sends events. */
-@FunctionalInterface
-interface EventSender {
+/** The ID of the checkpoint that this event is related to. */
+final long checkpointId;

Review Comment:
   Is there any existing example where we make a variable protected and do not 
provide accessor method for this variable?
   
   If no, it seems better to keep code style consistent with existing code.



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsExactlyOnceTest.java:
##
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import 

[GitHub] [flink-ml] taosiyuan163 commented on pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-26 Thread GitBox


taosiyuan163 commented on PR #132:
URL: https://github.com/apache/flink-ml/pull/132#issuecomment-1196269416

   Hi,@zhipeng93 , I updated the following according to your comments:
   
   1. Optimize the implementation for several operators.
   2. Reformat the document.
   3. Fix the test cases.
   4. Add validation for the size of the `inputs`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-24434) PyFlink YARN per-job on Docker test fails on Azure

2022-07-26 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo resolved FLINK-24434.
--
Resolution: Fixed

Since https://issues.apache.org/jira/browse/FLINK-28680 has been resolved, I 
think we can close the JIRA.

> PyFlink YARN per-job on Docker test fails on Azure
> --
>
> Key: FLINK-24434
> URL: https://issues.apache.org/jira/browse/FLINK-24434
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Deployment / YARN
>Affects Versions: 1.14.4, 1.15.0, 1.16.0
>Reporter: Dawid Wysakowicz
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: stale-major, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24669=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=23186
> {code}
> Sep 30 18:20:22 Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
>  Permission denied: user=mapred, access=WRITE, 
> inode="/":hdfs:hadoop:drwxr-xr-x
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:318)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:219)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:189)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1663)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1647)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1606)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3039)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1079)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:652)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> Sep 30 18:20:22   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
> Sep 30 18:20:22   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
> Sep 30 18:20:22   at 
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
> Sep 30 18:20:22   at 
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
> Sep 30 18:20:22   at java.security.AccessController.doPrivileged(Native 
> Method)
> Sep 30 18:20:22   at javax.security.auth.Subject.doAs(Subject.java:422)
> Sep 30 18:20:22   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840)
> Sep 30 18:20:22   at 
> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)
> Sep 30 18:20:22 
> Sep 30 18:20:22   at 
> org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
> Sep 30 18:20:22   at org.apache.hadoop.ipc.Client.call(Client.java:1435)
> Sep 30 18:20:22   at org.apache.hadoop.ipc.Client.call(Client.java:1345)
> Sep 30 18:20:22   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
> Sep 30 18:20:22   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> Sep 30 18:20:22   at com.sun.proxy.$Proxy12.mkdirs(Unknown Source)
> Sep 30 18:20:22   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:583)
> Sep 30 18:20:22   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Sep 30 18:20:22   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Sep 30 18:20:22   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Sep 30 18:20:22   at java.lang.reflect.Method.invoke(Method.java:498)
> Sep 30 18:20:22   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
> Sep 30 18:20:22   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
> Sep 30 18:20:22   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
> Sep 30 18:20:22   at 
> 

[jira] [Assigned] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener

2022-07-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-28696:
---

Assignee: Lijie Wang

> Notify all newlyAdded/Merged blocked nodes to BlocklistListener
> ---
>
> Key: FLINK-28696
> URL: https://issues.apache.org/jira/browse/FLINK-28696
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> This bug was introduced by FLINK-28660. Our newly added logic results in that 
> blocklist listener will not be notified when there are no newly added nodes 
> (only merge nodes) 。
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-28698:


Assignee: Junhan Yang

> The display order of aggregated metrics should follow the order of task state 
> transitions
> -
>
> Key: FLINK-28698
> URL: https://issues.apache.org/jira/browse/FLINK-28698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.16.0
>Reporter: Lijie Wang
>Assignee: Junhan Yang
>Priority: Minor
> Attachments: image-2022-07-27-10-00-49-345.png
>
>
> !image-2022-07-27-10-00-49-345.png|width=921,height=382!
>  Currently, the display order of task state duration is INITIALIZING, 
> CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable 
> to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is 
> follow the order of task state transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28699) Native rocksdb full snapshot in non-incremental checkpointing

2022-07-26 Thread Lihe Ma (Jira)
Lihe Ma created FLINK-28699:
---

 Summary: Native rocksdb full snapshot in non-incremental 
checkpointing
 Key: FLINK-28699
 URL: https://issues.apache.org/jira/browse/FLINK-28699
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.15.1, 1.14.5
Reporter: Lihe Ma


When rocksdb statebackend is used and state.backend.incremental enabled, flink 
will figure out newly created sst files generated by rocksdb during checkpoint, 
and read all the states from rocksdb and write to files during savepoint [1].

When state.backend.incremental disabled, flink will read all the states from 
rocksdb and generate state files in checkpoint and savepoint [2]. This makes 
sense in savepoint, cause user can take a savepoint with rocksdb statebackend 
and then restore it using another statebackend, but in checkpoint, 
deserialisation and serialisation of state results in performance loss.

If the native rocksdb snapshot is introduced in full snapshot, theoretically 
better performance can be achieved. At the same time, savepoint remains the 
same as before.

 
 # 
https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 # 
https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords

2022-07-26 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571719#comment-17571719
 ] 

Hangxiang Yu commented on FLINK-28653:
--

Hi, Maybe you could try to set

{{env.getConfig().enableForceAvro() to force avro}}

{{or }}

{{{}{}}}{{{}env.getConfig().disableGenericTypes() to disable kryo{}}}

and then see whether it works.

> State Schema Evolution does not work - Flink defaults to Kryo serialization 
> even for POJOs and Avro SpecificRecords
> ---
>
> Key: FLINK-28653
> URL: https://issues.apache.org/jira/browse/FLINK-28653
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Runtime / State Backends
>Affects Versions: 1.14.3, 1.15.0
> Environment: I ran the job on a Flink cluster I spun up using docker 
> compose:
> ```
> version: "2.2"
> services:
>   jobmanager:
>     image: flink:latest
>     ports:
>       - "8081:8081"
>     command: jobmanager
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>   taskmanager:
>     image: flink:latest
>     depends_on:
>       - jobmanager
>     command: taskmanager
>     scale: 1
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>         taskmanager.numberOfTaskSlots: 2
> ```
>  My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip.
> I'm running macOS Monterey Version 12.4.
>Reporter: Peleg Tsadok
>Priority: Major
>  Labels: KryoSerializer, State, avro, pojo, schema-evolution
>
> I am trying to do a POC of Flink State Schema Evolution. I am using Flink 
> 1.15.0 and Java 11 but also tested on Flink 1.14.3.
> I tried to create 3 data classes - one for each serialization type:
> 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not 
> supported for POJO serialization in Flink.
> 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, 
> `Long`, `String`. The getters, setters and constructors are generated using 
> Lombok.
> 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin.
> For each class I wrote a stream job that uses a time window to buffer 
> elements and turn them into a list.
> For each class I tried to do the following:
> 1. Run a job
> 2. Stop with savepoint
> 3. Add a field to the data class
> 4. Submit using savepoint
> For all data classes the submit with savepoint failed with this exception:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from 
> any of the 1 provided restore options.
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>     ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
>     at 
> 

[jira] [Assigned] (FLINK-28577) 1.15.1 web ui console report error about checkpoint size

2022-07-26 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-28577:


Assignee: Yu Chen

> 1.15.1 web ui console report error about checkpoint size
> 
>
> Key: FLINK-28577
> URL: https://issues.apache.org/jira/browse/FLINK-28577
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.15.1
>Reporter: nobleyd
>Assignee: Yu Chen
>Priority: Major
>
> 1.15.1
> 1 start-cluster
> 2 submit job: ./bin/flink run -d ./examples/streaming/TopSpeedWindowing.jar
> 3 trigger savepoint: ./bin/flink savepoint {{{jobId} ./sp0}}
> {{4 open web ui for job and change to checkpoint tab, nothing showed.}}
> {{Chrome console log shows some error:}}
> {{main.a7e97c2f60a2616e.js:1 ERROR TypeError: Cannot read properties of null 
> (reading 'checkpointed_size')
>     at q (253.e9e8f2b56b4981f5.js:1:607974)
>     at Sl (main.a7e97c2f60a2616e.js:1:186068)
>     at Br (main.a7e97c2f60a2616e.js:1:184696)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at B8 (main.a7e97c2f60a2616e.js:1:191872)}}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28577) 1.15.1 web ui console report error about checkpoint size

2022-07-26 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571720#comment-17571720
 ] 

Yun Tang commented on FLINK-28577:
--

[~Yu Chen] Thanks for figuring out this problem, and it seems we cannot have 
tests to cover the web UI changes:(, and maybe typos could lead such problems.

 

Already assigned this ticket to you, please go ahead.

> 1.15.1 web ui console report error about checkpoint size
> 
>
> Key: FLINK-28577
> URL: https://issues.apache.org/jira/browse/FLINK-28577
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.15.1
>Reporter: nobleyd
>Assignee: Yu Chen
>Priority: Major
>
> 1.15.1
> 1 start-cluster
> 2 submit job: ./bin/flink run -d ./examples/streaming/TopSpeedWindowing.jar
> 3 trigger savepoint: ./bin/flink savepoint {{{jobId} ./sp0}}
> {{4 open web ui for job and change to checkpoint tab, nothing showed.}}
> {{Chrome console log shows some error:}}
> {{main.a7e97c2f60a2616e.js:1 ERROR TypeError: Cannot read properties of null 
> (reading 'checkpointed_size')
>     at q (253.e9e8f2b56b4981f5.js:1:607974)
>     at Sl (main.a7e97c2f60a2616e.js:1:186068)
>     at Br (main.a7e97c2f60a2616e.js:1:184696)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at B8 (main.a7e97c2f60a2616e.js:1:191872)}}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zoltar9264 commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …

2022-07-26 Thread GitBox


zoltar9264 commented on code in PR #20152:
URL: https://github.com/apache/flink/pull/20152#discussion_r930584225


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##
@@ -87,9 +87,14 @@ protected  CheckpointableKeyedStateBackend restore(
 String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
 ExecutionConfig executionConfig = env.getExecutionConfig();
 
+env.getAsyncOperationsThreadPool();
+
 ChangelogStateFactory changelogStateFactory = new 
ChangelogStateFactory();
 CheckpointableKeyedStateBackend keyedStateBackend =
 ChangelogBackendRestoreOperation.restore(
+env.getJobID(),
+env.getAsyncOperationsThreadPool(),
+env.getTaskManagerInfo().getConfiguration(),

Review Comment:
   Hi @fredia , thanks for reply. I'm not suggest pass 
PERIODIC_MATERIALIZATION_INTERVAL directly. StateChangelogStorage may have 
different implementations, each one has different  options. I think an 
implementation-specific configuration should not be exposed in the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID

2022-07-26 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571718#comment-17571718
 ] 

Yun Tang commented on FLINK-28697:
--

[~jmahonin] Did you use different flink versions between the client and server 
side? Or could you share the full exception stack trace?

> MapDataSerializer doesn't declare a serialVersionUID
> 
>
> Key: FLINK-28697
> URL: https://issues.apache.org/jira/browse/FLINK-28697
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.1
>Reporter: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
>
> MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a 
> InvalidClassException when attempting to serialize with different JREs for 
> compilation / runtime.
> {code:java}
> Caused by: java.io.InvalidClassException: 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class 
> incompatible: stream classdesc serialVersionUID = 2533002123505507000, local 
> class serialVersionUID = 1622156938509929811 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID

2022-07-26 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-28697:
-
Fix Version/s: (was: 1.15.1)

> MapDataSerializer doesn't declare a serialVersionUID
> 
>
> Key: FLINK-28697
> URL: https://issues.apache.org/jira/browse/FLINK-28697
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.1
>Reporter: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
>
> MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a 
> InvalidClassException when attempting to serialize with different JREs for 
> compilation / runtime.
> {code:java}
> Caused by: java.io.InvalidClassException: 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class 
> incompatible: stream classdesc serialVersionUID = 2533002123505507000, local 
> class serialVersionUID = 1622156938509929811 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Tartarus0zm commented on pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax

2022-07-26 Thread GitBox


Tartarus0zm commented on PR #20252:
URL: https://github.com/apache/flink/pull/20252#issuecomment-1196221001

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-28698:
-
Priority: Minor  (was: Major)

> The display order of aggregated metrics should follow the order of task state 
> transitions
> -
>
> Key: FLINK-28698
> URL: https://issues.apache.org/jira/browse/FLINK-28698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.16.0
>Reporter: Lijie Wang
>Priority: Minor
> Attachments: image-2022-07-27-10-00-49-345.png
>
>
> !image-2022-07-27-10-00-49-345.png|width=921,height=382!
>  Currently, the display order of task state duration is INITIALIZING, 
> CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable 
> to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is 
> follow the order of task state transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571715#comment-17571715
 ] 

Xintong Song commented on FLINK-28698:
--

Thanks [~wanglijie95], that makes sense to me.

[~junhany], could you please take a look?

> The display order of aggregated metrics should follow the order of task state 
> transitions
> -
>
> Key: FLINK-28698
> URL: https://issues.apache.org/jira/browse/FLINK-28698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.16.0
>Reporter: Lijie Wang
>Priority: Major
> Attachments: image-2022-07-27-10-00-49-345.png
>
>
> !image-2022-07-27-10-00-49-345.png|width=921,height=382!
>  Currently, the display order of task state duration is INITIALIZING, 
> CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable 
> to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is 
> follow the order of task state transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-28698:
-
Affects Version/s: 1.16.0

> The display order of aggregated metrics should follow the order of task state 
> transitions
> -
>
> Key: FLINK-28698
> URL: https://issues.apache.org/jira/browse/FLINK-28698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.16.0
>Reporter: Lijie Wang
>Priority: Major
> Attachments: image-2022-07-27-10-00-49-345.png
>
>
> !image-2022-07-27-10-00-49-345.png|width=921,height=382!
>  Currently, the display order of task state duration is INITIALIZING, 
> CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable 
> to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is 
> follow the order of task state transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] PatrickRen merged pull request #20177: [FLINK-28416][table] Add (Async)LookupFunction and providers in replace of (Async)TableFunction as the API for lookup table

2022-07-26 Thread GitBox


PatrickRen merged PR #20177:
URL: https://github.com/apache/flink/pull/20177


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LinMingQiang commented on pull request #19995: [FLINK-28060][BP 1.15][Connector/Kafka] Updated Kafka Clients to 3.1.1

2022-07-26 Thread GitBox


LinMingQiang commented on PR #19995:
URL: https://github.com/apache/flink/pull/19995#issuecomment-1196218666

   I still have problems applying this patch on 1.15.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …

2022-07-26 Thread GitBox


fredia commented on code in PR #20152:
URL: https://github.com/apache/flink/pull/20152#discussion_r930579497


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##
@@ -87,9 +87,14 @@ protected  CheckpointableKeyedStateBackend restore(
 String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
 ExecutionConfig executionConfig = env.getExecutionConfig();
 
+env.getAsyncOperationsThreadPool();
+
 ChangelogStateFactory changelogStateFactory = new 
ChangelogStateFactory();
 CheckpointableKeyedStateBackend keyedStateBackend =
 ChangelogBackendRestoreOperation.restore(
+env.getJobID(),
+env.getAsyncOperationsThreadPool(),
+env.getTaskManagerInfo().getConfiguration(),

Review Comment:
   > Do you mean changing StateChangelogStorageFactory interface and passing 
ExecutionConfig to createStorageView instead of Configuration?
   
   I tend to put `PERIODIC_MATERIALIZATION_INTERVAL` directly as a parameter 
instead of `xxConfiguration`.
   
   > it should be env.getJobConfiguration()ideally merged 
withenv.getTaskManagerInfo().getConfiguration()`, right?
   
   I'm not sure.  Currently, most configuration is in 
`env.getTaskManagerInfo().getConfiguration()` or `env.getTaskConfiguration()`,  
which is better to merge?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28577) 1.15.1 web ui console report error about checkpoint size

2022-07-26 Thread Yu Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571713#comment-17571713
 ] 

Yu Chen commented on FLINK-28577:
-

Well, I think I've found the root cause of this issue: it is mainly introduced 
by the mistake modification of the flink-runtime-web by 
[JIRA-25557|https://issues.apache.org/jira/browse/FLINK-25557].

And I can propose a PR to resolve the problem.

cc: [~yunta] 

> 1.15.1 web ui console report error about checkpoint size
> 
>
> Key: FLINK-28577
> URL: https://issues.apache.org/jira/browse/FLINK-28577
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.15.1
>Reporter: nobleyd
>Priority: Major
>
> 1.15.1
> 1 start-cluster
> 2 submit job: ./bin/flink run -d ./examples/streaming/TopSpeedWindowing.jar
> 3 trigger savepoint: ./bin/flink savepoint {{{jobId} ./sp0}}
> {{4 open web ui for job and change to checkpoint tab, nothing showed.}}
> {{Chrome console log shows some error:}}
> {{main.a7e97c2f60a2616e.js:1 ERROR TypeError: Cannot read properties of null 
> (reading 'checkpointed_size')
>     at q (253.e9e8f2b56b4981f5.js:1:607974)
>     at Sl (main.a7e97c2f60a2616e.js:1:186068)
>     at Br (main.a7e97c2f60a2616e.js:1:184696)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at N8 (main.a7e97c2f60a2616e.js:1:185128)
>     at Br (main.a7e97c2f60a2616e.js:1:185153)
>     at B8 (main.a7e97c2f60a2616e.js:1:191872)}}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] DavidLiu001 commented on pull request #20338: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…

2022-07-26 Thread GitBox


DavidLiu001 commented on PR #20338:
URL: https://github.com/apache/flink/pull/20338#issuecomment-1196217453

   Nice,  I will follow your suggestion for the later PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] DavidLiu001 commented on pull request #20338: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…

2022-07-26 Thread GitBox


DavidLiu001 commented on PR #20338:
URL: https://github.com/apache/flink/pull/20338#issuecomment-1196217231

   Nice,  I will follow your suggestion for the later PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #20371: [FLINK-27908][network] Introduce HsResultPartition and HsSubpartitionView

2022-07-26 Thread GitBox


flinkbot commented on PR #20371:
URL: https://github.com/apache/flink/pull/20371#issuecomment-1196216583

   
   ## CI report:
   
   * ee6783ba798cf9d8adfed4d750769614f3626240 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on a diff in pull request #20306: [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression

2022-07-26 Thread GitBox


masteryhx commented on code in PR #20306:
URL: https://github.com/apache/flink/pull/20306#discussion_r930578280


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java:
##
@@ -168,7 +168,7 @@ private UploadTasksResult upload(Path path, 
Collection tasks) throws
 wrappedStreamClosed = true;
 }
 } finally {
-if (!wrappedStreamClosed) {
+if (!wrappedStreamClosed || compression) {
 fsStream.close();
 }

Review Comment:
   I have tried to remove and rerun it, it also cannot work. It's not the 
reason.
   Actually, I found the the case will always fail whether enabling compression 
or not.
   What confused me is that the case will always fail only if I warp the stream 
even if there is no other logic.
   And It just keep running until timeout and don't thrown an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] DavidLiu001 commented on pull request #20360: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…

2022-07-26 Thread GitBox


DavidLiu001 commented on PR #20360:
URL: https://github.com/apache/flink/pull/20360#issuecomment-1196214414

BUILD FAILURE: Failed to execute goal on project 
flink-connector-elasticsearch-base: Could not resolve dependencies for project 
org.apache.flink:flink-connector-elasticsearch-base:jar:1.16-SNAPSHOT: Failed 
to collect dependencies at 
org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:7.10.2 -> 
org.elasticsearch.plugin:rank-eval-client:jar:7.10.2: Failed to read artifact 
descriptor for org.elasticsearch.plugin:rank-eval-client:jar:7.10.2: Could not 
transfer artifact org.elasticsearch.plugin:rank-eval-client:pom:7.10.2 from/to 
google-maven-central 
(https://maven-central-eu.storage-download.googleapis.com/maven2/): Read timed 
out -> [Help 1]
   
   @dannycranmer  It seems it is the environment issue of Azure. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27908) Introduce HsResultPartition and HsSubpartitionView

2022-07-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27908:
---
Labels: pull-request-available  (was: )

> Introduce HsResultPartition and HsSubpartitionView
> --
>
> Key: FLINK-27908
> URL: https://issues.apache.org/jira/browse/FLINK-27908
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa opened a new pull request, #20371: [FLINK-27908][network] Introduce HsResultPartition and HsSubpartitionView

2022-07-26 Thread GitBox


reswqa opened a new pull request, #20371:
URL: https://github.com/apache/flink/pull/20371

   ## What is the purpose of the change
   
   *Introduce HsResultPartition and HsSubpartitionView, this is the last part 
of hybrid shuffle mode in the shuffle layer.*
   
   
   ## Brief change log
   
 - *Extends onResultPartitionClosed to HsSpillingStrategy and add lifecycle 
method for hybrid shuffle component.*
 - *HybridShuffleConfiguration supports set spillingStrategy name and load 
spillingStrategy.*
 - *Introduce HsResultPartition and HsSubpartitionView.*
 - *ResultPartitionFactory also supports HYBRID type and introduce config 
option for hybrid spilling strategy.*
   
   
   ## Verifying this change
   
   This change added UT tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Tartarus0zm commented on pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax

2022-07-26 Thread GitBox


Tartarus0zm commented on PR #20252:
URL: https://github.com/apache/flink/pull/20252#issuecomment-1196203437

   @flinkbo run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …

2022-07-26 Thread GitBox


zoltar9264 commented on code in PR #20152:
URL: https://github.com/apache/flink/pull/20152#discussion_r930567765


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorWithCache.java:
##
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.flink.configuration.StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL;
+
+/** StateChangeIterator with local cache. */
+class StateChangeIteratorWithCache extends StateChangeIteratorImpl {
+private static final Logger LOG = 
LoggerFactory.getLogger(StateChangeIteratorWithCache.class);
+
+private static final String CACHE_FILE_PREFIX = "dstl-";
+
+private final File cacheDir;
+private final ConcurrentMap cache = new 
ConcurrentHashMap<>();
+private final ScheduledExecutorService cacheCleanScheduler;
+private final ExecutorService downloadExecutor;
+private final long cacheIdleMillis;
+
+StateChangeIteratorWithCache(ExecutorService downloadExecutor, 
Configuration config) {
+// TODO: 2022/5/31 add a new options for cache idle
+long cacheIdleMillis = 
config.get(PERIODIC_MATERIALIZATION_INTERVAL).toMillis();
+File cacheDir = ConfigurationUtils.getRandomTempDirectory(config);
+
+this.cacheCleanScheduler =
+SchedulerFactory.create(1, "ChangelogCacheFileCleanScheduler", 
LOG);
+this.downloadExecutor = downloadExecutor;
+this.cacheIdleMillis = cacheIdleMillis;
+this.cacheDir = cacheDir;
+}
+
+@Override
+public CloseableIterator read(StreamStateHandle handle, long 
offset)
+throws IOException {
+
+if (!(handle instanceof FileStateHandle)) {
+return new 
StateChangeFormat().read(wrapAndSeek(handle.openInputStream(), offset));
+}
+
+FileStateHandle fileHandle = (FileStateHandle) handle;
+DataInputStream input;
+
+if (fileHandle.getFilePath().getFileSystem().isDistributedFS()) {
+
+Path dfsPath = fileHandle.getFilePath();
+FileCache fileCache =
+cache.computeIfAbsent(
+dfsPath,
+key -> {
+FileCache fCache = new FileCache(cacheDir);
+downloadExecutor.execute(() -> 
downloadFile(fileHandle, fCache));
+return fCache;
+});
+
+FileInputStream fin = fileCache.openAndSeek(offset);
+
+input =
+new DataInputStream(new BufferedInputStream(fin)) {
+@Override
+public void close() throws IOException {
+super.close();
+if (fileCache.getRefCount() == 0) {
+

[jira] [Commented] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Lijie Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571694#comment-17571694
 ] 

Lijie Wang commented on FLINK-28698:


cc [~xtsong] 

> The display order of aggregated metrics should follow the order of task state 
> transitions
> -
>
> Key: FLINK-28698
> URL: https://issues.apache.org/jira/browse/FLINK-28698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Lijie Wang
>Priority: Major
> Attachments: image-2022-07-27-10-00-49-345.png
>
>
> !image-2022-07-27-10-00-49-345.png|width=921,height=382!
>  Currently, the display order of task state duration is INITIALIZING, 
> CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable 
> to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is 
> follow the order of task state transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-28698:
---
Description: 
!image-2022-07-27-10-00-49-345.png|width=921,height=382!

 Currently, the display order of task state duration is INITIALIZING, CREATED, 
SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable to change to 
CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is follow the order 
of task state transitions.

  was:
!image-2022-07-27-10-00-49-345.png|width=921,height=382!

 Currently, the display order of task state duration is INITIALIZING, CREATED, 
SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable to change to 
CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is follow the order 
of task state 


> The display order of aggregated metrics should follow the order of task state 
> transitions
> -
>
> Key: FLINK-28698
> URL: https://issues.apache.org/jira/browse/FLINK-28698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Lijie Wang
>Priority: Major
> Attachments: image-2022-07-27-10-00-49-345.png
>
>
> !image-2022-07-27-10-00-49-345.png|width=921,height=382!
>  Currently, the display order of task state duration is INITIALIZING, 
> CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable 
> to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is 
> follow the order of task state transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-28698:
---
Description: 
!image-2022-07-27-10-00-49-345.png|width=921,height=382!

 Currently, the display order of task state duration is INITIALIZING, CREATED, 
SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable to change to 
CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is follow the order 
of task state 

> The display order of aggregated metrics should follow the order of task state 
> transitions
> -
>
> Key: FLINK-28698
> URL: https://issues.apache.org/jira/browse/FLINK-28698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Lijie Wang
>Priority: Major
> Attachments: image-2022-07-27-10-00-49-345.png
>
>
> !image-2022-07-27-10-00-49-345.png|width=921,height=382!
>  Currently, the display order of task state duration is INITIALIZING, 
> CREATED, SCHEDULED, RUNNING, DEPLOYING. I think it would be more reasonable 
> to change to CAEATED, SCHEDULED, DEPLOYING, INITIALIZING, RUNNING, which is 
> follow the order of task state 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-28698:
---
Attachment: image-2022-07-27-10-00-49-345.png

> The display order of aggregated metrics should follow the order of task state 
> transitions
> -
>
> Key: FLINK-28698
> URL: https://issues.apache.org/jira/browse/FLINK-28698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Lijie Wang
>Priority: Major
> Attachments: image-2022-07-27-10-00-49-345.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28698) The display order of aggregated metrics should follow the order of task state transitions

2022-07-26 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-28698:
--

 Summary: The display order of aggregated metrics should follow the 
order of task state transitions
 Key: FLINK-28698
 URL: https://issues.apache.org/jira/browse/FLINK-28698
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Lijie Wang
 Attachments: image-2022-07-27-10-00-49-345.png





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26902) Introduce performence tune and benchmark document for table store

2022-07-26 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571693#comment-17571693
 ] 

Jingsong Lee commented on FLINK-26902:
--

Hi [~ConradJam] You can see from your point of view, how having a document can 
help to better use.
For benchmark, there is a flink-table-store-benchmark in the repo.

> Introduce performence tune and benchmark document for table store
> -
>
> Key: FLINK-26902
> URL: https://issues.apache.org/jira/browse/FLINK-26902
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Priority: Minor
> Fix For: table-store-0.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Vancior commented on a diff in pull request #20263: [FLINK-28510][python][connector] Support using new KafkaSink API

2022-07-26 Thread GitBox


Vancior commented on code in PR #20263:
URL: https://github.com/apache/flink/pull/20263#discussion_r930546050


##
flink-python/pyflink/datastream/connectors/base.py:
##
@@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]):
 super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+@abstractmethod
+def apply(self, ds):
+pass
+
+
+class PreTransformWrapper(ABC):
+
+@abstractmethod
+def need_pre_transform(self) -> bool:

Review Comment:
   This is still needed that a sink implementing `SupportsPreprocessing` may 
not require preprocessing, e.g. `KafkaSink` doesn't need preprocessing when 
using fixed topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28692) Check warehouse path in CatalogFactory

2022-07-26 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-28692.

Resolution: Fixed

master: d30e7998b136dec0f9aded8376a516a76b467f2e
release-0.2: f149a4f349fa2ced986bec42b3d1f0cd27004343

> Check warehouse path in CatalogFactory
> --
>
> Key: FLINK-28692
> URL: https://issues.apache.org/jira/browse/FLINK-28692
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>
> * Not exist, automatic creating the directory.
> * Exist but it is not directory, throw exception.
> * Exist and it is a directory, pass...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #243: [FLINK-28692] Check warehouse path in CatalogFactory

2022-07-26 Thread GitBox


JingsongLi merged PR #243:
URL: https://github.com/apache/flink-table-store/pull/243


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] tweise commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

2022-07-26 Thread GitBox


tweise commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1196163399

   @xinbinhuang looking at all the modifications to HybridSource itself I think 
it is necessary to take a step back here and discuss the design aspects first.
   
   The underlying sources are a sequence of bounded sources, optionally 
followed by an unbounded source. Therefore, there should be no need to have a 
"dynamic reader" that does special things. The enumerator knows upfront which 
splits need to be processed and when it is finished.
   
   The HybridSource already has the support to transfer the end position to the 
next enumerator. That was part of the FLIP and the details can be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
 and you can find an example in the tests: 
HybridSourceITCase.sourceWithDynamicSwitchPosition
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] czy006 commented on pull request #308: [FLINK-28223] Add artifact-fetcher to the pod-template.yaml example

2022-07-26 Thread GitBox


czy006 commented on PR #308:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/308#issuecomment-1196161652

   @morhidi PLAT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

2022-07-26 Thread GitBox


lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r929988353


##
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/AddRemoteJarITCase.java:
##
@@ -0,0 +1,110 @@
+package org.apache.flink.table.client.gateway.context;
+
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
+import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for adding remote jar. */
+public class AddRemoteJarITCase {
+@ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+

Review Comment:
   We should not introduce this test here, the local jar tests has been covered 
by existing test. Regarding to remote jar test, it should be introduced in e2e 
module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28681) The number of windows allocated by sliding windows is inaccurate, when the window length is irregular

2022-07-26 Thread nyingping (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nyingping updated FLINK-28681:
--
Description: 
In assignWindows, the initial length of the list of Windows is determined by 
'(int) (size/slide)'.
{code:java}
List windows = new ArrayList<>((int) (size / slide)) {code}
This calculation is not accurate when the sliding window length is irregular.

For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives 
3, but the final number of Windows is 4.

 

Although this does not affect functionality.But I think it would be better.
{code:java}
int len = (int) Math.ceil((double) size / slide);
List windows = new ArrayList<>(len); {code}
 

  was:
In assignWindows, the initial length of the list of Windows is determined by 
'(int) (size/slide)'.
{code:java}
List windows = new ArrayList<>((int) (size / slide)) {code}
This calculation is not accurate when the sliding window length is irregular.

For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives 
3, but the final number of Windows is 4.

 

Although this does not affect functionality.But I think it would be better.
{code:java}
int len = (int) Math.ceil((double) (size / slide));
List windows = new ArrayList<>(len); {code}
 


> The number of windows allocated by sliding windows is inaccurate, when the 
> window length is irregular
> -
>
> Key: FLINK-28681
> URL: https://issues.apache.org/jira/browse/FLINK-28681
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.1
>Reporter: nyingping
>Priority: Minor
>
> In assignWindows, the initial length of the list of Windows is determined by 
> '(int) (size/slide)'.
> {code:java}
> List windows = new ArrayList<>((int) (size / slide)) {code}
> This calculation is not accurate when the sliding window length is irregular.
> For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives 
> 3, but the final number of Windows is 4.
>  
> Although this does not affect functionality.But I think it would be better.
> {code:java}
> int len = (int) Math.ceil((double) size / slide);
> List windows = new ArrayList<>(len); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

2022-07-26 Thread GitBox


lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r92650


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
 return 
functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
 }
 
+@Override
+public void addJar(String jarPath) {

Review Comment:
   Please add UT about `ADD Jar` and `SHOW JARS` in `TableEnvironmentTest`, add 
IT case about `ADD Jar` in `TableEnvironmentITCase` which you can refer to the 
related tests in `FunctionITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

2022-07-26 Thread GitBox


flinkbot commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1196114714

   
   ## CI report:
   
   * c584408092f780ceb187b5d90cb999a1d699b2ae UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)

2022-07-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28185:
---
Labels: pull-request-available  (was: )

> "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
> 
>
> Key: FLINK-28185
> URL: https://issues.apache.org/jira/browse/FLINK-28185
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Flink 1.15.0
> Kafka 2.8.1
>Reporter: Peter Schrott
>Assignee: Mason Chen
>Priority: Minor
>  Labels: pull-request-available
> Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, 
> NegativeOffsetSpec.scala
>
>
> When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty 
> partitions – little traffice + low retention – an {{IllegalArgumentException: 
> Invalid negative offset}} occures. See stracktrace below.
> The problem here is, that the admin client returns -1 as timestamps and 
> offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] 
> Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} 
> object from the admin client response the exception is thrown.
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition 
> splits due to 
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>     at 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
>     at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>     at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>     at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
>     ... 8 common frames omitted
> 15:25:58.025 INFO  [flink-akka.actor.default-dispatcher-11] 
> o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 
> 351e440289835f2ff3e6fee31bf6e13c).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
>     at 
> 

[GitHub] [flink] mas-chen opened a new pull request, #20370: [FLINK-28185] Make TimestampOffsetsInitializer apply offset reset str…

2022-07-26 Thread GitBox


mas-chen opened a new pull request, #20370:
URL: https://github.com/apache/flink/pull/20370

   …ategy and handle timestamps that do not map to an offset
   
   ## What is the purpose of the change
   
   This change improves the TimestampOffsetsInitializer to be initialized with 
a configured offset reset strategy. The default behavior (LATEST) is preserved. 
This also fixes a bug for when the timestamp does not correspond to an offset 
in Kafka and clarifies the exception message that is thrown.
   
   ## Brief change log
   
   - Handles EARLIEST/LATEST/NONE
   - For timestamps that do not correspond to an offset in Kafka and if 
configured with NONE, the initializer will throw an explicit exception.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - Added unit test to test the various offset reset strategies and the edge 
case where timestamp does not correspond to an offset in Kafka
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …

2022-07-26 Thread GitBox


rkhachatryan commented on code in PR #20152:
URL: https://github.com/apache/flink/pull/20152#discussion_r930493192


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorWithCache.java:
##
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.flink.configuration.StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL;
+
+/** StateChangeIterator with local cache. */
+class StateChangeIteratorWithCache extends StateChangeIteratorImpl {
+private static final Logger LOG = 
LoggerFactory.getLogger(StateChangeIteratorWithCache.class);
+
+private static final String CACHE_FILE_PREFIX = "dstl-";
+
+private final File cacheDir;
+private final ConcurrentMap cache = new 
ConcurrentHashMap<>();
+private final ScheduledExecutorService cacheCleanScheduler;
+private final ExecutorService downloadExecutor;
+private final long cacheIdleMillis;
+
+StateChangeIteratorWithCache(ExecutorService downloadExecutor, 
Configuration config) {
+// TODO: 2022/5/31 add a new options for cache idle
+long cacheIdleMillis = 
config.get(PERIODIC_MATERIALIZATION_INTERVAL).toMillis();
+File cacheDir = ConfigurationUtils.getRandomTempDirectory(config);
+
+this.cacheCleanScheduler =
+SchedulerFactory.create(1, "ChangelogCacheFileCleanScheduler", 
LOG);
+this.downloadExecutor = downloadExecutor;
+this.cacheIdleMillis = cacheIdleMillis;
+this.cacheDir = cacheDir;
+}
+
+@Override
+public CloseableIterator read(StreamStateHandle handle, long 
offset)
+throws IOException {
+
+if (!(handle instanceof FileStateHandle)) {
+return new 
StateChangeFormat().read(wrapAndSeek(handle.openInputStream(), offset));
+}
+
+FileStateHandle fileHandle = (FileStateHandle) handle;
+DataInputStream input;
+
+if (fileHandle.getFilePath().getFileSystem().isDistributedFS()) {
+
+Path dfsPath = fileHandle.getFilePath();
+FileCache fileCache =
+cache.computeIfAbsent(
+dfsPath,
+key -> {
+FileCache fCache = new FileCache(cacheDir);
+downloadExecutor.execute(() -> 
downloadFile(fileHandle, fCache));
+return fCache;
+});
+
+FileInputStream fin = fileCache.openAndSeek(offset);
+
+input =
+new DataInputStream(new BufferedInputStream(fin)) {
+@Override
+public void close() throws IOException {
+super.close();
+if (fileCache.getRefCount() == 0) {
+

[jira] [Updated] (FLINK-27701) HashMapStateBackendWindowITCase. testAggregateWindowStateReader failed with Not all required tasks are currently running

2022-07-26 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27701:
---
  Labels: auto-deprioritized-major test-stability  (was: stale-major 
test-stability)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> HashMapStateBackendWindowITCase. testAggregateWindowStateReader failed with  
> Not all required tasks are currently running
> -
>
> Key: FLINK-27701
> URL: https://issues.apache.org/jira/browse/FLINK-27701
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> 2022-05-19T11:04:27.4331524Z May 19 11:04:27 [ERROR] Tests run: 9, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 29.034 s <<< FAILURE! - in 
> org.apache.flink.state.api.HashMapStateBackendWindowITCase
> 2022-05-19T11:04:27.4333055Z May 19 11:04:27 [ERROR] 
> org.apache.flink.state.api.HashMapStateBackendWindowITCase.testAggregateWindowStateReader
>   Time elapsed: 0.105 s  <<< ERROR!
> 2022-05-19T11:04:27.4333765Z May 19 11:04:27 java.lang.RuntimeException: 
> Failed to take savepoint
> 2022-05-19T11:04:27.4334405Z May 19 11:04:27  at 
> org.apache.flink.state.api.utils.SavepointTestBase.takeSavepoint(SavepointTestBase.java:68)
> 2022-05-19T11:04:27.4335375Z May 19 11:04:27  at 
> org.apache.flink.state.api.SavepointWindowReaderITCase.testAggregateWindowStateReader(SavepointWindowReaderITCase.java:149)
> 2022-05-19T11:04:27.4338106Z May 19 11:04:27  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-05-19T11:04:27.4339140Z May 19 11:04:27  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-05-19T11:04:27.4339854Z May 19 11:04:27  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-05-19T11:04:27.4340560Z May 19 11:04:27  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-05-19T11:04:27.4341746Z May 19 11:04:27  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-05-19T11:04:27.4342797Z May 19 11:04:27  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-05-19T11:04:27.4343717Z May 19 11:04:27  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-05-19T11:04:27.4344909Z May 19 11:04:27  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-05-19T11:04:27.4345993Z May 19 11:04:27  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-05-19T11:04:27.4346981Z May 19 11:04:27  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-05-19T11:04:27.4347590Z May 19 11:04:27  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-05-19T11:04:27.4348200Z May 19 11:04:27  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-05-19T11:04:27.4348856Z May 19 11:04:27  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-05-19T11:04:27.4349484Z May 19 11:04:27  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-05-19T11:04:27.4350118Z May 19 11:04:27  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-05-19T11:04:27.4350899Z May 19 11:04:27  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-05-19T11:04:27.4352057Z May 19 11:04:27  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-05-19T11:04:27.4353154Z May 19 11:04:27  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-05-19T11:04:27.4354153Z May 19 11:04:27  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-05-19T11:04:27.4354936Z May 19 11:04:27  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-05-19T11:04:27.4355560Z May 19 11:04:27  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-05-19T11:04:27.4356167Z May 19 11:04:27  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-05-19T11:04:27.4356775Z May 19 11:04:27  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-05-19T11:04:27.4357358Z May 19 11:04:27  at 
> 

[jira] [Updated] (FLINK-27695) KafkaTransactionLogITCase failed on azure due to Could not find a valid Docker environment

2022-07-26 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27695:
---
  Labels: auto-deprioritized-major test-stability  (was: stale-major 
test-stability)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> KafkaTransactionLogITCase failed on azure due to Could not find a valid 
> Docker environment
> --
>
> Key: FLINK-27695
> URL: https://issues.apache.org/jira/browse/FLINK-27695
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Huang Xingbo
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> 022-05-19T02:04:23.9190098Z May 19 02:04:23 [ERROR] Tests run: 1, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 7.404 s <<< FAILURE! - in 
> org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase
> 2022-05-19T02:04:23.9191182Z May 19 02:04:23 [ERROR] 
> org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase  Time 
> elapsed: 7.404 s  <<< ERROR!
> 2022-05-19T02:04:23.9192250Z May 19 02:04:23 java.lang.IllegalStateException: 
> Could not find a valid Docker environment. Please see logs and check 
> configuration
> 2022-05-19T02:04:23.9193144Z May 19 02:04:23  at 
> org.testcontainers.dockerclient.DockerClientProviderStrategy.lambda$getFirstValidStrategy$4(DockerClientProviderStrategy.java:156)
> 2022-05-19T02:04:23.9194653Z May 19 02:04:23  at 
> java.util.Optional.orElseThrow(Optional.java:290)
> 2022-05-19T02:04:23.9196179Z May 19 02:04:23  at 
> org.testcontainers.dockerclient.DockerClientProviderStrategy.getFirstValidStrategy(DockerClientProviderStrategy.java:148)
> 2022-05-19T02:04:23.9197995Z May 19 02:04:23  at 
> org.testcontainers.DockerClientFactory.getOrInitializeStrategy(DockerClientFactory.java:146)
> 2022-05-19T02:04:23.9199486Z May 19 02:04:23  at 
> org.testcontainers.DockerClientFactory.client(DockerClientFactory.java:188)
> 2022-05-19T02:04:23.9200666Z May 19 02:04:23  at 
> org.testcontainers.DockerClientFactory$1.getDockerClient(DockerClientFactory.java:101)
> 2022-05-19T02:04:23.9202109Z May 19 02:04:23  at 
> com.github.dockerjava.api.DockerClientDelegate.authConfig(DockerClientDelegate.java:107)
> 2022-05-19T02:04:23.9203065Z May 19 02:04:23  at 
> org.testcontainers.containers.GenericContainer.start(GenericContainer.java:316)
> 2022-05-19T02:04:23.9204641Z May 19 02:04:23  at 
> org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066)
> 2022-05-19T02:04:23.9205765Z May 19 02:04:23  at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> 2022-05-19T02:04:23.9206568Z May 19 02:04:23  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2022-05-19T02:04:23.9207497Z May 19 02:04:23  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-05-19T02:04:23.9208246Z May 19 02:04:23  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 2022-05-19T02:04:23.9208887Z May 19 02:04:23  at 
> org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 2022-05-19T02:04:23.9209691Z May 19 02:04:23  at 
> org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> 2022-05-19T02:04:23.9210490Z May 19 02:04:23  at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
> 2022-05-19T02:04:23.9211246Z May 19 02:04:23  at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> 2022-05-19T02:04:23.9211989Z May 19 02:04:23  at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 2022-05-19T02:04:23.9212682Z May 19 02:04:23  at 
> java.util.Iterator.forEachRemaining(Iterator.java:116)
> 2022-05-19T02:04:23.9213391Z May 19 02:04:23  at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> 2022-05-19T02:04:23.9214305Z May 19 02:04:23  at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 2022-05-19T02:04:23.9215044Z May 19 02:04:23  at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 2022-05-19T02:04:23.9215809Z May 19 02:04:23  at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 2022-05-19T02:04:23.9216576Z May 19 02:04:23  at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 2022-05-19T02:04:23.9217523Z May 19 02:04:23  at 
> 

[jira] [Updated] (FLINK-22741) Hide Flink complexity from Stateful Functions

2022-07-26 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22741:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Hide Flink complexity from Stateful Functions
> -
>
> Key: FLINK-22741
> URL: https://issues.apache.org/jira/browse/FLINK-22741
> Project: Flink
>  Issue Type: New Feature
>  Components: Stateful Functions
>Affects Versions: statefun-3.0.0
>Reporter: Stephan Ewen
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> This is an umbrella issue for various issues to hide and reduce the 
> complexity and surface area (and configuration space) of Apache Flink when 
> using Stateful Functions.
> The goal of this is to create a setup and configuration that works robustly 
> in the vast majority of settings. Users should not be required to configure 
> anything Flink-specific, but only general parameters (like for example total 
> memory size) and StateFun specific parameters (like request timeouts and 
> batching).
> If this happens at the cost of some minor regression in peak stream 
> throughput, we can most likely stomach that in StateFun, because the 
> performance cost is commonly dominated by the interaction between StateFun 
> cluster and remote function deployments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27775) FlinkKafkaProducer VS KafkaSink

2022-07-26 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27775:
---
Labels: features stale-major  (was: features)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> FlinkKafkaProducer VS KafkaSink
> ---
>
> Key: FLINK-27775
> URL: https://issues.apache.org/jira/browse/FLINK-27775
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.14.3
>Reporter: Jiangfei Liu
>Priority: Major
>  Labels: features, stale-major
> Attachments: Snipaste_2022-05-25_19-52-11.png
>
>
> sorry,my english is bad
> in Flink1.14.3,write 1 data to kafka
> when use FlinkKafkaProducer,comleted  7s
> when use KafkaSink,comleted 1m40s
> why KafkaSink is low speed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase.testSlidingTimeWindow failed with restore

2022-07-26 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28440:
---
Labels: stale-critical test-stability  (was: test-stability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Critical but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 14 days. I have gone ahead and marked it "stale-critical". If this 
ticket is critical, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.


> EventTimeWindowCheckpointingITCase.testSlidingTimeWindow failed with restore
> 
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Critical
>  Labels: stale-critical, test-stability
>
> {code:java}
> 2022-07-07T03:27:47.5779102Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2022-07-07T03:27:47.5779722Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2022-07-07T03:27:47.5780444Z  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> 2022-07-07T03:27:47.5781338Z  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2022-07-07T03:27:47.5781955Z  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2022-07-07T03:27:47.5782587Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-07-07T03:27:47.5783184Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-07-07T03:27:47.5783843Z  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
> 2022-07-07T03:27:47.5784599Z  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2022-07-07T03:27:47.5785284Z  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2022-07-07T03:27:47.5785907Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-07-07T03:27:47.5786528Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-07-07T03:27:47.5787121Z  at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
> 2022-07-07T03:27:47.5787874Z  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> 2022-07-07T03:27:47.5788498Z  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> 2022-07-07T03:27:47.5789265Z  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> 2022-07-07T03:27:47.5789968Z  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2022-07-07T03:27:47.5790582Z  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2022-07-07T03:27:47.5791198Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-07-07T03:27:47.5791799Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-07-07T03:27:47.5792351Z  at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> 2022-07-07T03:27:47.5793075Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:300)
> 2022-07-07T03:27:47.5793572Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:297)
> 2022-07-07T03:27:47.5794075Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> 2022-07-07T03:27:47.5794586Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> 2022-07-07T03:27:47.5795094Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2022-07-07T03:27:47.5795654Z  at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> 2022-07-07T03:27:47.5796307Z  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> 2022-07-07T03:27:47.5796922Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> 2022-07-07T03:27:47.5797574Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> 2022-07-07T03:27:47.5798196Z  at 
> 

[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-07-26 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571628#comment-17571628
 ] 

David Anderson commented on FLINK-28060:


[~martijnvisser] [~renqs] [~mason6345] Are we any closer to understanding this 
issue? Do we know how to fix it, either for a 1.15.2 release, or for 1.16.0?

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-27692) Support local recovery for materialized part(write, restore, discard)

2022-07-26 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan closed FLINK-27692.
-
  Assignee: Yanfei Lei
Resolution: Fixed

Merged into master as 
aef75be34d99c737f5c565703a971027ac44f855..52519a8eb695c9523c546439c66910b15f19be20
Thanks for the contribution [~Yanfei Lei]!

> Support local recovery for materialized part(write, restore, discard)
> -
>
> Key: FLINK-27692
> URL: https://issues.apache.org/jira/browse/FLINK-27692
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Support local recovery for materialized part(write, restore, discard)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan merged pull request #19907: [FLINK-27692][state] Support local recovery for materialized part of changelog

2022-07-26 Thread GitBox


rkhachatryan merged PR #19907:
URL: https://github.com/apache/flink/pull/19907


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dannycranmer commented on pull request #20360: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…

2022-07-26 Thread GitBox


dannycranmer commented on PR #20360:
URL: https://github.com/apache/flink/pull/20360#issuecomment-1195928597

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression

2022-07-26 Thread Hongbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571487#comment-17571487
 ] 

Hongbo edited comment on FLINK-28693 at 7/26/22 5:49 PM:
-

Swaping flink-table-planner-loader with flink-table-planer in the opt/ folder 
can solve the problem.

Is there any drawback to the swap? (If not, why use flink-table-planner-loader 
as the default, as I saw others also reported problems about this lib).


was (Author: liuhb86):
Swapping replacing flink-table-planner-loader with flink-table-planer in the 
opt/ folder can solve the problem.

Is there any drawback to the swap? (If not, why use flink-table-planner-loader 
as the default, as I saw others also reported problems about this lib).

> Codegen failed if the watermark is defined on a columnByExpression
> --
>
> Key: FLINK-28693
> URL: https://issues.apache.org/jira/browse/FLINK-28693
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.1
>Reporter: Hongbo
>Priority: Major
>
> The following code will throw an exception:
>  
> {code:java}
> Table program cannot be compiled. This is a bug. Please file an issue.
>  ...
>  Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 
> 54: Cannot determine simple type name "org" {code}
> {color:#00}Code:{color}
> {code:java}
> public class TestUdf extends  ScalarFunction {
> @DataTypeHint("TIMESTAMP(3)")
> public LocalDateTime eval(String strDate) {
>return LocalDateTime.now();
> }
> }
> public class FlinkTest {
> @Test
> void testUdf() throws Exception {
> //var env = StreamExecutionEnvironment.createLocalEnvironment();
> // run `gradlew shadowJar` first to generate the uber jar.
> // It contains the kafka connector and a dummy UDF function.
> var env = 
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081,
> "build/libs/flink-test-all.jar");
> env.setParallelism(1);
> var tableEnv = StreamTableEnvironment.create(env);
> tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class);
> var testTable = tableEnv.from(TableDescriptor.forConnector("kafka")
> .schema(Schema.newBuilder()
> .column("time_stamp", DataTypes.STRING())
> .columnByExpression("udf_ts", "TEST_UDF(time_stamp)")
> .watermark("udf_ts", "udf_ts - INTERVAL '1' second")
> .build())
> // the kafka server doesn't need to exist. It fails in the 
> compile stage before fetching data.
> .option("properties.bootstrap.servers", "localhost:9092")
> .option("topic", "test_topic")
> .option("format", "json")
> .option("scan.startup.mode", "latest-offset")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var query = tableEnv.sqlQuery("select * from test");
> var tableResult = 
> query.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> }
> }{code}
> What does the code do?
>  # read a stream from Kakfa
>  # create a derived column using an UDF expression
>  # define the watermark based on the derived column
> The full callstack:
>  
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> 

[GitHub] [flink] rkhachatryan commented on a diff in pull request #20306: [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression

2022-07-26 Thread GitBox


rkhachatryan commented on code in PR #20306:
URL: https://github.com/apache/flink/pull/20306#discussion_r930161714


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java:
##
@@ -168,7 +168,7 @@ private UploadTasksResult upload(Path path, 
Collection tasks) throws
 wrappedStreamClosed = true;
 }
 } finally {
-if (!wrappedStreamClosed) {
+if (!wrappedStreamClosed || compression) {
 fsStream.close();
 }

Review Comment:
   An alternative could be to remove
   ```
   OutputStream compressed =
   compression ? instance.decorateWithCompression(fsStream) : fsStream;
   ```
   
   Could you check if that causes the build to fail?



##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java:
##
@@ -168,7 +168,7 @@ private UploadTasksResult upload(Path path, 
Collection tasks) throws
 wrappedStreamClosed = true;
 }
 } finally {
-if (!wrappedStreamClosed) {
+if (!wrappedStreamClosed || compression) {
 fsStream.close();
 }

Review Comment:
   An alternative could be to remove
   ```
   OutputStream compressed =
   compression ? instance.decorateWithCompression(fsStream) : fsStream;
   ```
   
   Could you check if that causes the build to fail?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-26853) HeapStateBackend ignores metadata updates in certain cases

2022-07-26 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan closed FLINK-26853.
-
Fix Version/s: 1.16.0
   Resolution: Fixed

Merged into master as 8df50536ef913b63620d896423c39cdd01941c55.

> HeapStateBackend ignores metadata updates in certain cases
> --
>
> Key: FLINK-26853
> URL: https://issues.apache.org/jira/browse/FLINK-26853
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.4, 1.15.0, 1.16.0
>Reporter: Roman Khachatryan
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> On recovery, HeapRestoreOperation reads state handles one by one;
>  * each handle contains metadata at the beginning;
>  * the metadata is always read, but not actually used if a state with the 
> corresponding name was already registered
> In a rare case of downscaling + multiple checkpoints with different metadata; 
> this might lead to data being deserialized incorrectly (always using the 
> initial metadata).
> It also prevents incremental checkpoints with schema evolution.
> On first access, however, the backend itself will update (merge) metadata; so 
> that it doesn't affect new state updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan merged pull request #20268: [FLINK-26853][state] Update state serializer in StateMap when metaInfo changed

2022-07-26 Thread GitBox


rkhachatryan merged PR #20268:
URL: https://github.com/apache/flink/pull/20268


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #19907: [FLINK-27692][state] Support local recovery for materialized part of changelog

2022-07-26 Thread GitBox


rkhachatryan commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r930115553


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -407,8 +408,15 @@ public RunnableFuture> 
snapshot(
 
metrics.reportSnapshotResult(snapshotResult))
 .thenApply(
 snapshotResult ->
-SnapshotResult.of(
-
snapshotResult.getJobManagerOwnedSnapshot(;
+snapshotResult.getTaskLocalSnapshot() 
== null
+|| snapshotResult
+
.getJobManagerOwnedSnapshot()
+== null
+? SnapshotResult.of(
+
snapshotResult.getJobManagerOwnedSnapshot())
+: 
SnapshotResult.withLocalState(
+
snapshotResult.getJobManagerOwnedSnapshot(),
+
snapshotResult.getTaskLocalSnapshot(;

Review Comment:
   nit: there is no need to re-create the result object, and more importantly 
this is less readable than just casting:
   ```
   .thenApply(this::castSnapshotResult));
   ...
   @SuppressWarnings("unchecked")
   private SnapshotResult 
castSnapshotResult(SnapshotResult snapshotResult) {
   return (SnapshotResult) snapshotResult;
   }
   
   ```



##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -407,8 +408,15 @@ public RunnableFuture> 
snapshot(
 
metrics.reportSnapshotResult(snapshotResult))
 .thenApply(
 snapshotResult ->
-SnapshotResult.of(
-
snapshotResult.getJobManagerOwnedSnapshot(;
+snapshotResult.getTaskLocalSnapshot() 
== null
+|| snapshotResult
+
.getJobManagerOwnedSnapshot()
+== null
+? SnapshotResult.of(
+
snapshotResult.getJobManagerOwnedSnapshot())
+: 
SnapshotResult.withLocalState(
+
snapshotResult.getJobManagerOwnedSnapshot(),
+
snapshotResult.getTaskLocalSnapshot(;

Review Comment:
   nit: there is no need to re-create the result object, and more importantly 
this is less readable than just casting:
   ```
   .thenApply(this::castSnapshotResult));
   ...
   @SuppressWarnings("unchecked")
   private SnapshotResult 
castSnapshotResult(SnapshotResult snapshotResult) {
   return (SnapshotResult) snapshotResult;
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #20355: [FLINK-28634][json] Add simple JsonSerDeSchema

2022-07-26 Thread GitBox


zentol commented on PR #20355:
URL: https://github.com/apache/flink/pull/20355#issuecomment-1195637892

   > It's very common for deserializers to include an implementation of 
getProducedType.
   
   We extend the `AbstractDeserializationSchema` which takes care of that based 
on the class/typeinfo passed to the constructor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #20369: [FLINK-28697][table] MapDataSerializer doesn't declare a serialVersionUID

2022-07-26 Thread GitBox


flinkbot commented on PR #20369:
URL: https://github.com/apache/flink/pull/20369#issuecomment-1195631087

   
   ## CI report:
   
   * 410ffd4ee14e36e442595526a503e214bd03ba79 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID

2022-07-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28697:
---
Labels: pull-request-available  (was: )

> MapDataSerializer doesn't declare a serialVersionUID
> 
>
> Key: FLINK-28697
> URL: https://issues.apache.org/jira/browse/FLINK-28697
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.1
>Reporter: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.1
>
>
> MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a 
> InvalidClassException when attempting to serialize with different JREs for 
> compilation / runtime.
> {code:java}
> Caused by: java.io.InvalidClassException: 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class 
> incompatible: stream classdesc serialVersionUID = 2533002123505507000, local 
> class serialVersionUID = 1622156938509929811 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] jmahonin opened a new pull request, #20369: [FLINK-28697][table] MapDataSerializer doesn't declare a serialVersionUID

2022-07-26 Thread GitBox


jmahonin opened a new pull request, #20369:
URL: https://github.com/apache/flink/pull/20369

   ## What is the purpose of the change
   
   *MapDataSerializer doesn't declare a serialVersionUID, which can manifest as 
a InvalidClassException when attempting to serialize with different JREs for 
compilation / runtime.
   
   
   ## Brief change log
   
 - * Set it to serialVersionUID to 1L as with the other TypeSerializers*
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
- *Verified manually that this solves the serialization mismatch*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: don't know
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] alpinegizmo commented on a diff in pull request #20355: [FLINK-28634][json] Add simple JsonSerDeSchema

2022-07-26 Thread GitBox


alpinegizmo commented on code in PR #20355:
URL: https://github.com/apache/flink/pull/20355#discussion_r930088762


##
docs/content.zh/docs/connectors/datastream/formats/json.md:
##
@@ -0,0 +1,77 @@
+---
+title:  "JSON"
+weight: 4
+type: docs
+---
+
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your 
project:
+
+```xml
+
+   org.apache.flink
+   flink-json
+   {{< version >}}
+   provided
+
+```
+
+Flink supports reading/writing JSON records via the 
`JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and 
support any type that is supported by Jackson, including, but not limited to, 
`POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports 
the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a 
`POJO`:
+
+```java
+JsonDeserializationSchema jsonFormat = new 
JsonDeserializationSchema<>(SomePojo.class);
+KafkaSource source =
+KafkaSource.builder()
+.setValueOnlyDeserializer(jsonFormat)
+...
+```
+
+The `JsonSerializationSchema` can be used with any connector that supports the 
`SerializationSchema`.
+
+For example, this is how you use it with a `KafkaSink` to serialize a `POJO`:
+
+```java
+JsonSerializationSchema jsonFormat = new JsonSerializationSchema<>();
+KafkaSink source =
+KafkaSink.builder()
+.setRecordSerializer(
+new KafkaRecordSerializationSchemaBuilder<>()
+.setValueSerializationSchema(jsonFormat)
+...
+```
+
+## Custom Mapper
+
+Both schemas have constructors that accept a 
`SerializableSupplier`, acting a factory for object mappers.

Review Comment:
   ```suggestion
   Both schemas have constructors that accept a 
`SerializableSupplier`, acting as a factory for object mappers.
   ```



##
docs/content/docs/connectors/datastream/formats/json.md:
##
@@ -0,0 +1,77 @@
+---
+title:  "JSON"
+weight: 4
+type: docs
+---
+
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your 
project:
+
+```xml
+
+   org.apache.flink
+   flink-json
+   {{< version >}}
+   provided
+
+```
+
+Flink supports reading/writing JSON records via the 
`JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and 
support any type that is supported by Jackson, including, but not limited to, 
`POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports 
the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a 
`POJO`:
+
+```java
+JsonDeserializationSchema jsonFormat=new 
JsonDeserializationSchema<>(SomePojo.class);
+KafkaSource source=
+KafkaSource.builder()
+.setValueOnlyDeserializer(jsonFormat)
+...
+```
+
+The `JsonSerializationSchema` can be used with any connector that supports the 
`SerializationSchema`.
+
+For example, this is how you use it with a `KafkaSink` to serialize a `POJO`:
+
+```java
+JsonSerializationSchema jsonFormat=new JsonSerializationSchema<>();
+KafkaSink source  = 
+KafkaSink.builder()
+.setRecordSerializer(
+new KafkaRecordSerializationSchemaBuilder<>()
+.setValueSerializationSchema(jsonFormat)
+...
+```
+
+## Custom Mapper
+
+Both schemas have constructors that accept a 
`SerializableSupplier`, acting a factory for object mappers.
+With this factory you gain full control over the created mapper, and can 
enable/disable various Jackson features or register modules to extend the set 
of supported types or add additional functionality.

Review Comment:
   ```suggestion
   With this factory you gain full control over the created mapper, and you can 
enable/disable various Jackson features or register modules to extend the set 
of supported types or add additional functionality.
   ```



##
docs/content.zh/docs/connectors/datastream/formats/json.md:
##
@@ -0,0 +1,77 @@
+---
+title:  "JSON"
+weight: 4
+type: docs
+---
+
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your 
project:
+
+```xml
+
+   org.apache.flink
+   flink-json
+   {{< version >}}
+   provided
+
+```
+
+Flink supports reading/writing JSON records via the 
`JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and 
support any type that is supported by Jackson, including, but not limited to, 
`POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports 
the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a 
`POJO`:
+
+```java
+JsonDeserializationSchema jsonFormat = new 

[GitHub] [flink] flinkbot commented on pull request #20368: [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be notified when there are no newly added nodes (only merge nodes)

2022-07-26 Thread GitBox


flinkbot commented on PR #20368:
URL: https://github.com/apache/flink/pull/20368#issuecomment-1195615756

   
   ## CI report:
   
   * ec687cbbb821644dc86a3642d62500e5158d8970 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID

2022-07-26 Thread Josh Mahonin (Jira)
Josh Mahonin created FLINK-28697:


 Summary: MapDataSerializer doesn't declare a serialVersionUID
 Key: FLINK-28697
 URL: https://issues.apache.org/jira/browse/FLINK-28697
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.15.1
Reporter: Josh Mahonin
 Fix For: 1.15.1


MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a 
InvalidClassException when attempting to serialize with different JREs for 
compilation / runtime.
{code:java}
Caused by: java.io.InvalidClassException: 
org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class 
incompatible: stream classdesc serialVersionUID = 2533002123505507000, local 
class serialVersionUID = 1622156938509929811 {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener

2022-07-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28696:
---
Labels: pull-request-available  (was: )

> Notify all newlyAdded/Merged blocked nodes to BlocklistListener
> ---
>
> Key: FLINK-28696
> URL: https://issues.apache.org/jira/browse/FLINK-28696
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> This bug was introduced by FLINK-28660. Our newly added logic results in that 
> blocklist listener will not be notified when there are no newly added nodes 
> (only merge nodes) 。
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression

2022-07-26 Thread Hongbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571487#comment-17571487
 ] 

Hongbo commented on FLINK-28693:


Swapping replacing flink-table-planner-loader with flink-table-planer in the 
opt/ folder can solve the problem.

Is there any drawback to the swap? (If not, why use flink-table-planner-loader 
as the default, as I saw others also reported problems about this lib).

> Codegen failed if the watermark is defined on a columnByExpression
> --
>
> Key: FLINK-28693
> URL: https://issues.apache.org/jira/browse/FLINK-28693
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.1
>Reporter: Hongbo
>Priority: Major
>
> The following code will throw an exception:
>  
> {code:java}
> Table program cannot be compiled. This is a bug. Please file an issue.
>  ...
>  Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 
> 54: Cannot determine simple type name "org" {code}
> {color:#00}Code:{color}
> {code:java}
> public class TestUdf extends  ScalarFunction {
> @DataTypeHint("TIMESTAMP(3)")
> public LocalDateTime eval(String strDate) {
>return LocalDateTime.now();
> }
> }
> public class FlinkTest {
> @Test
> void testUdf() throws Exception {
> //var env = StreamExecutionEnvironment.createLocalEnvironment();
> // run `gradlew shadowJar` first to generate the uber jar.
> // It contains the kafka connector and a dummy UDF function.
> var env = 
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081,
> "build/libs/flink-test-all.jar");
> env.setParallelism(1);
> var tableEnv = StreamTableEnvironment.create(env);
> tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class);
> var testTable = tableEnv.from(TableDescriptor.forConnector("kafka")
> .schema(Schema.newBuilder()
> .column("time_stamp", DataTypes.STRING())
> .columnByExpression("udf_ts", "TEST_UDF(time_stamp)")
> .watermark("udf_ts", "udf_ts - INTERVAL '1' second")
> .build())
> // the kafka server doesn't need to exist. It fails in the 
> compile stage before fetching data.
> .option("properties.bootstrap.servers", "localhost:9092")
> .option("topic", "test_topic")
> .option("format", "json")
> .option("scan.startup.mode", "latest-offset")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var query = tableEnv.sqlQuery("select * from test");
> var tableResult = 
> query.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> }
> }{code}
> What does the code do?
>  # read a stream from Kakfa
>  # create a derived column using an UDF expression
>  # define the watermark based on the derived column
> The full callstack:
>  
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> 

[GitHub] [flink] wanglijie95 opened a new pull request, #20368: [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be notified when there are no newly added nodes (only merge nod

2022-07-26 Thread GitBox


wanglijie95 opened a new pull request, #20368:
URL: https://github.com/apache/flink/pull/20368

   ## What is the purpose of the change
   [FLINK-28696][runtime] Fix the bug that the blocklist listeners will not be 
notified when there are no newly added nodes (only merge nodes)
   
   
   ## Verifying this change
   `DefaultBlocklistHandlerTest#testAddNewBlockedNodes`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**not applicable**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #308: [FLINK-28223] Add artifact-fetcher to the pod-template.yaml example

2022-07-26 Thread GitBox


morhidi commented on code in PR #308:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/308#discussion_r930060228


##
examples/pod-template-artifact-fetcher.yaml:
##
@@ -0,0 +1,84 @@
+

Review Comment:
   Pls remove this file, this is just a duplicate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28592) Custom resource counter metrics improvements

2022-07-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28592:
---
Labels: pull-request-available  (was: )

> Custom resource counter metrics improvements
> 
>
> Key: FLINK-28592
> URL: https://issues.apache.org/jira/browse/FLINK-28592
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Matyas Orhidi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0
>
>
> * add counters at global level
>  * add option to turn on/off the metrics



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] morhidi opened a new pull request, #331: [FLINK-28592] Custom resource counter metrics improvements

2022-07-26 Thread GitBox


morhidi opened a new pull request, #331:
URL: https://github.com/apache/flink-kubernetes-operator/pull/331

   ## What is the purpose of the change
   - add option to turn on/off the base custom resource metrics
   - some additional refactor around the `MetricManager`
   ## Brief change log
   - added `kubernetes.operator.resource.metrics.enabled` to turn on/off the 
metrics
   - added `TestingMetricListener` utility class
   - refactored the `MetricManager` to use registered `CustomResourceMetrics` 
interfaces
   - fixed generic types on `StatusRecorder`
   
   ## Verifying this change
   - covered by improved `FlinkDeploymentMetricsTest`, 
`FlinkSessionJobMetricsTest`
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   docs added for `kubernetes.operator.resource.metrics.enabled`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28680) No space left on device on Azure e2e_2_ci tests

2022-07-26 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571471#comment-17571471
 ] 

Robert Metzger commented on FLINK-28680:


Sorry, I wasn't sure whether 1.14 is still maintained.

merged to release-1.14 in 274040dafad472cfe2a88e1e3038258545f1a2ef

> No space left on device on Azure e2e_2_ci tests
> ---
>
> Key: FLINK-28680
> URL: https://issues.apache.org/jira/browse/FLINK-28680
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.14.5, 1.16.0
>Reporter: Huang Xingbo
>Assignee: Robert Metzger
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> Many e2e tests failed due to no enough space. We previously cleaned up the 
> space by cleaning up the flink-e2e directory, but at the moment this is not 
> enough to solve the problem. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] maosuhan commented on a diff in pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2022-07-26 Thread GitBox


maosuhan commented on code in PR #14376:
URL: https://github.com/apache/flink/pull/14376#discussion_r930035338


##
flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE:
##
@@ -0,0 +1,9 @@
+flink-sql-protobuf
+Copyright 2014-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)

Review Comment:
   @libenchao thanks for your comment. I have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #20367: [FLINK-19589][Filesystem][S3] Support per-connector FileSystem configuration

2022-07-26 Thread GitBox


flinkbot commented on PR #20367:
URL: https://github.com/apache/flink/pull/20367#issuecomment-1195554304

   
   ## CI report:
   
   * 8c636d1ba0cb8564474bcaaec4e8996111bdb7cd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26902) Introduce performence tune and benchmark document for table store

2022-07-26 Thread ConradJam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571460#comment-17571460
 ] 

ConradJam commented on FLINK-26902:
---

Hi [~lzljs3620320] I want to tick this ticket to my first commit to table store 
:), Where should I start referring to this part of the performence tune and 
benchmark information?

> Introduce performence tune and benchmark document for table store
> -
>
> Key: FLINK-26902
> URL: https://issues.apache.org/jira/browse/FLINK-26902
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Priority: Minor
> Fix For: table-store-0.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration

2022-07-26 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571456#comment-17571456
 ] 

Josh Mahonin commented on FLINK-19589:
--

PR is open. Note that this only address dynamic configuration for the S3 
connector. I'm happy to retarget to another ticket if that's more appropriate, 
or if there are suggestions to generalize this more broadly, that's fine too.

> Support per-connector FileSystem configuration
> --
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Assignee: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
> Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-19589) Support per-connector FileSystem configuration

2022-07-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19589:
---
Labels: pull-request-available  (was: )

> Support per-connector FileSystem configuration
> --
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Assignee: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
> Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] jmahonin opened a new pull request, #20367: [FLINK-19589][Filesystem][S3] Support per-connector FileSystem configuration

2022-07-26 Thread GitBox


jmahonin opened a new pull request, #20367:
URL: https://github.com/apache/flink/pull/20367

   ## What is the purpose of the change
   
   Introduces dynamic filesystem configuration for S3
   
   ## Brief change log
   
 - *Add URI query parameter parsing and apply `fs.s3a.` entries to Hadoop 
Configuration for S3/S3A*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change added tests and can be verified as follows:
   
 - *Added unit test that verifies URI parameters are parsed and applied to 
Hadoop configuration*
 - *A version of this is running in production for an S3-backed sink using 
an AssumedRoleProvider and dynamic role ARN*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-26 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930018268


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies
+DataStream> observedFreq =
+DataStreamUtils.mapPartition(
+colAndFeatureAndLabel.keyBy(Tuple3::hashCode),
+new GenerateObservedFrequencies());
+
+SingleOutputStreamOperator> 
filledObservedFreq =

Review Comment:
   > How about we just compute the `distinct labels` and postpone the `fill` 
operation to Line#199, e.g., `DataStream> 
categoricalStatistics =...`?
   > 
   > Using `parallellism=1` for computing all data is not efficient usually.
   
   

[jira] [Updated] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener

2022-07-26 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-28696:
---
Description: 
This bug was introduced by FLINK-28660. Our newly added logic results in that 
blocklist listener will not be notified when there are no newly added nodes 
(only merge nodes) 。

 

> Notify all newlyAdded/Merged blocked nodes to BlocklistListener
> ---
>
> Key: FLINK-28696
> URL: https://issues.apache.org/jira/browse/FLINK-28696
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lijie Wang
>Priority: Major
> Fix For: 1.16.0
>
>
> This bug was introduced by FLINK-28660. Our newly added logic results in that 
> blocklist listener will not be notified when there are no newly added nodes 
> (only merge nodes) 。
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28696) Notify all newlyAdded/Merged blocked nodes to BlocklistListener

2022-07-26 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-28696:
--

 Summary: Notify all newlyAdded/Merged blocked nodes to 
BlocklistListener
 Key: FLINK-28696
 URL: https://issues.apache.org/jira/browse/FLINK-28696
 Project: Flink
  Issue Type: Sub-task
Reporter: Lijie Wang
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27536) Rename method parameter in AsyncSinkWriter

2022-07-26 Thread DavidLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571447#comment-17571447
 ] 

DavidLiu commented on FLINK-27536:
--

[~dannycranmer]  Please help review this Pull Request again,which has been 
updated. 

[https://github.com/apache/flink/pull/20360]

> Rename method parameter in AsyncSinkWriter
> --
>
> Key: FLINK-27536
> URL: https://issues.apache.org/jira/browse/FLINK-27536
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Assignee: DavidLiu
>Priority: Minor
>  Labels: pull-request-available
>
> Change the abstract method's parameter naming in AsyncSinkWriter.
> From
>   Consumer> requestResult
> to
>   Consumer> requestToRetry
> or something similar.
>  
> This is because the consumer here is supposed to accept a list of requests 
> that need to be retried.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-27536) Rename method parameter in AsyncSinkWriter

2022-07-26 Thread DavidLiu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-27536 ]


DavidLiu deleted comment on FLINK-27536:
--

was (Author: JIRAUSER289843):
[~dannycranmer]  Please help review this Pull Request again,which has been 
updated. 

[https://github.com/apache/flink/pull/20360]

 

 

 

> Rename method parameter in AsyncSinkWriter
> --
>
> Key: FLINK-27536
> URL: https://issues.apache.org/jira/browse/FLINK-27536
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Assignee: DavidLiu
>Priority: Minor
>  Labels: pull-request-available
>
> Change the abstract method's parameter naming in AsyncSinkWriter.
> From
>   Consumer> requestResult
> to
>   Consumer> requestToRetry
> or something similar.
>  
> This is because the consumer here is supposed to accept a list of requests 
> that need to be retried.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28695) Fail to send partition request to restarted taskmanager

2022-07-26 Thread Simonas (Jira)
Simonas created FLINK-28695:
---

 Summary: Fail to send partition request to restarted taskmanager
 Key: FLINK-28695
 URL: https://issues.apache.org/jira/browse/FLINK-28695
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes, Runtime / Network
Affects Versions: 1.15.1, 1.15.0
Reporter: Simonas
 Attachments: deployment.txt, job_log.txt, jobmanager_config.txt, 
jobmanager_logs.txt, pod_restart.txt, taskmanager_config.txt

After upgrade to *1.15.1* we started getting error while deploying JOB

 
{code:java}
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed.at 
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
 {code}
{code:java}
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException 
atrg.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
 ChannelPromise)(Unknown Source){code}
After investigation we managed narrow it down to the exact behavior then this 
issue happens:
 # Deploying JOB on fresh kubernetes session cluster with multiple 
TaskManagers: TM1 and TM2 is successful. Job has multiple partitions running on 
both TM1 and TM2.
 # One TaskManager TM2 (XXX.XXX.XX.32) fails for unrelated issue. For example 
OOM exception.
 # Kubernetes POD with mentioned TaskManager TM2 is restarted. POD retains same 
IP address as before.
 # JobManager is able to pickup the restarted TM2 (XXX.XXX.XX.32)
 # JOB is restarted because it was running on the failed TaskManager TM2
 # TM1 data channel to TM2 is closed and we get LocalTransportException: 
Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed during JOB 
running stage.  
 # When we explicitly delete pod with TM2 it creates new POD with different IP 
address and JOB is able to start again.

Important to note that we didn't encountered this issue with previous *1.14.4* 
version and TaskManager restarts didn't cause such error.

Please note attached kubernetes deployments and reduced logs from JobManager. 
TaskManager logs did show errors before error, but doesn't show anything 
significant after restart.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression

2022-07-26 Thread Hongbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongbo updated FLINK-28693:
---
Description: 
The following code will throw an exception:

 
{code:java}
Table program cannot be compiled. This is a bug. Please file an issue.
 ...
 Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 54: 
Cannot determine simple type name "org" {code}
{color:#00}Code:{color}
{code:java}
public class TestUdf extends  ScalarFunction {
@DataTypeHint("TIMESTAMP(3)")
public LocalDateTime eval(String strDate) {
   return LocalDateTime.now();
}
}

public class FlinkTest {
@Test
void testUdf() throws Exception {
//var env = StreamExecutionEnvironment.createLocalEnvironment();
// run `gradlew shadowJar` first to generate the uber jar.
// It contains the kafka connector and a dummy UDF function.

var env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081,
"build/libs/flink-test-all.jar");
env.setParallelism(1);
var tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class);

var testTable = tableEnv.from(TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("time_stamp", DataTypes.STRING())
.columnByExpression("udf_ts", "TEST_UDF(time_stamp)")
.watermark("udf_ts", "udf_ts - INTERVAL '1' second")
.build())
// the kafka server doesn't need to exist. It fails in the 
compile stage before fetching data.
.option("properties.bootstrap.servers", "localhost:9092")
.option("topic", "test_topic")
.option("format", "json")
.option("scan.startup.mode", "latest-offset")
.build());
testTable.printSchema();
tableEnv.createTemporaryView("test", testTable );

var query = tableEnv.sqlQuery("select * from test");
var tableResult = 
query.executeInsert(TableDescriptor.forConnector("print").build());
tableResult.await();
}
}{code}
What does the code do?
 # read a stream from Kakfa
 # create a derived column using an UDF expression
 # define the watermark based on the derived column

The full callstack:

 
{code:java}
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
    at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]
    at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]
    at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]
    at 
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 [flink-dist-1.15.1.jar:1.15.1]
    at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
[flink-dist-1.15.1.jar:1.15.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 

[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

2022-07-26 Thread GitBox


lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r929988353


##
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/AddRemoteJarITCase.java:
##
@@ -0,0 +1,110 @@
+package org.apache.flink.table.client.gateway.context;
+
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
+import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for adding remote jar. */
+public class AddRemoteJarITCase {
+@ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+

Review Comment:
   We should not introduce this test, the local jar tests has been covered by 
existing test. Regarding to remote jar test, it should be introduced in e2e 
module.



##
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java:
##
@@ -156,13 +156,7 @@ public void testAddJarWithRelativePath() throws 
IOException {
 
 @Test
 public void testAddIllegalJar() {
-validateAddJarWithException("/path/to/illegal.jar", "JAR file does not 
exist");
-}
-
-@Test
-public void testAddRemoteJar() {
-validateAddJarWithException(

Review Comment:
   This test should not be removed.



##
flink-table/flink-sql-client/pom.xml:
##
@@ -515,6 +522,26 @@ under the License.
provided

 
+   
+   org.apache.hadoop
+   hadoop-hdfs
+   test
+   
+
+   
+   org.apache.hadoop
+   hadoop-hdfs

Review Comment:
   I think introducing hdfs dependency is a little heavy, we should test remote 
jar in e2e module, so I think these related test should be placed in 
flink-end-to-end-tests-sql module.
   
   Regarding to how to use hdfs cluster in e2e test, you can refer to the 
flink-end-to-end-tests-hbase module.



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
 return 
functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
 }
 
+@Override
+public void addJar(String jarPath) {

Review Comment:
   Please add UT about `ADD Jar` and `SHOW JARS` in `TableEnvironmentITCase`, 
add IT case about `ADD Jar` in `TableEnvironmentITCase` which you can refer to 
the related tests in `FunctionITCase`.



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -575,6 +576,20 @@ void createFunction(
  */
 boolean dropTemporaryFunction(String path);
 
+/**
+ * Add jar to the classLoader for use.
+ *
+ * @param jarPath The jar path to be added.
+ */
+void addJar(String jarPath);

Review Comment:
   We should not introducing these two method, they are public api, we cannot 
introduce public api arbitrarily, it should be discussed in community. We 
should reuse the `AddJarOperation` and `ShowJarsOperation`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression

2022-07-26 Thread Hongbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571444#comment-17571444
 ] 

Hongbo commented on FLINK-28693:


Is it caused by change in the codegen logic or a classloader problem because of 
the new module `{{{}flink-table-planner-loader{}}}`?

> Codegen failed if the watermark is defined on a columnByExpression
> --
>
> Key: FLINK-28693
> URL: https://issues.apache.org/jira/browse/FLINK-28693
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.1
>Reporter: Hongbo
>Priority: Major
>
> The following code will throw an exception:
>  
> {code:java}
> Table program cannot be compiled. This is a bug. Please file an issue.
>  ...
>  Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 
> 54: Cannot determine simple type name "org" {code}
> {color:#00}Code:{color}
> {code:java}
> public class TestUdf extends  ScalarFunction {
> @DataTypeHint("TIMESTAMP(3)")
> public LocalDateTime eval(String strDate) {
>return LocalDateTime.now();
> }
> }
> public class FlinkTest {
> @Test
> void testUdf() throws Exception {
> //var env = StreamExecutionEnvironment.createLocalEnvironment();
> // run `gradlew shadowJar` first to generate the uber jar.
> // It contains the kafka connector and a dummy UDF function.
> var env = 
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081,
> "build/libs/flink-test-all.jar");
> env.setParallelism(1);
> var tableEnv = StreamTableEnvironment.create(env);
> tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class);
> var testTable = tableEnv.from(TableDescriptor.forConnector("kafka")
> .schema(Schema.newBuilder()
> .column("time_stamp", DataTypes.STRING())
> .columnByExpression("udf_ts", "TEST_UDF(time_stamp)")
> .watermark("udf_ts", "udf_ts - INTERVAL '1' second")
> .build())
> // the kafka server doesn't need to exist. It fails in the 
> compile stage before fetching data.
> .option("properties.bootstrap.servers", "localhost:9092")
> .option("topic", "test_topic")
> .option("format", "json")
> .option("scan.startup.mode", "latest-offset")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var query = tableEnv.sqlQuery("select * from test");
> var tableResult = 
> query.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> }
> }{code}
> What does the code do?
>  # read a stream from Kakfa
>  # create a derived column using an UDF expression
>  # define the watermark based on the derived column
> The full callstack:
>  
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104)
>  ~[flink-dist-1.15.1.jar:1.15.1]at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426)
>  ~[flink-dist-1.15.1.jar:1.15.1]at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402)
>  ~[flink-dist-1.15.1.jar:1.15.1]at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
>  ~[flink-dist-1.15.1.jar:1.15.1]at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist-1.15.1.jar:1.15.1]at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.15.1.jar:1.15.1]at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>  

[jira] [Updated] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression

2022-07-26 Thread Hongbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongbo updated FLINK-28693:
---
Description: 
The following code will throw an exception:

 
{code:java}
Table program cannot be compiled. This is a bug. Please file an issue.
 ...
 Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 54: 
Cannot determine simple type name "org" {code}
{color:#00}Code:{color}
{code:java}
public class TestUdf extends  ScalarFunction {
@DataTypeHint("TIMESTAMP(3)")
public LocalDateTime eval(String strDate) {
   return LocalDateTime.now();
}
}

public class FlinkTest {
@Test
void testUdf() throws Exception {
//var env = StreamExecutionEnvironment.createLocalEnvironment();
// run `gradlew shadowJar` first to generate the uber jar.
// It contains the kafka connector and a dummy UDF function.

var env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081,
"build/libs/flink-test-all.jar");
env.setParallelism(1);
var tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class);

var testTable = tableEnv.from(TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("time_stamp", DataTypes.STRING())
.columnByExpression("udf_ts", "TEST_UDF(time_stamp)")
.watermark("udf_ts", "udf_ts - INTERVAL '1' second")
.build())
// the kafka server doesn't need to exist. It fails in the 
compile stage before fetching data.
.option("properties.bootstrap.servers", "localhost:9092")
.option("topic", "test_topic")
.option("format", "json")
.option("scan.startup.mode", "latest-offset")
.build());
testTable.printSchema();
tableEnv.createTemporaryView("test", testTable );

var query = tableEnv.sqlQuery("select * from test");
var tableResult = 
query.executeInsert(TableDescriptor.forConnector("print").build());
tableResult.await();
}
}{code}
What does the code do?
 # read a stream from Kakfa
 # create a derived column using an UDF expression
 # define the watermark based on the derived column

The full callstack:

 
{code:java}
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 [flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 

[jira] [Comment Edited] (FLINK-27536) Rename method parameter in AsyncSinkWriter

2022-07-26 Thread DavidLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571196#comment-17571196
 ] 

DavidLiu edited comment on FLINK-27536 at 7/26/22 1:54 PM:
---

[~dannycranmer]  Please help review this Pull Request again,which has been 
updated. 

[https://github.com/apache/flink/pull/20360]

 

 

 


was (Author: JIRAUSER289843):
[~dannycranmer]  The old PR was closed, please help review the new Pull Request

[https://github.com/apache/flink/pull/20360]

The build failure was fixed and verified locally.

 

> Rename method parameter in AsyncSinkWriter
> --
>
> Key: FLINK-27536
> URL: https://issues.apache.org/jira/browse/FLINK-27536
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Assignee: DavidLiu
>Priority: Minor
>  Labels: pull-request-available
>
> Change the abstract method's parameter naming in AsyncSinkWriter.
> From
>   Consumer> requestResult
> to
>   Consumer> requestToRetry
> or something similar.
>  
> This is because the consumer here is supposed to accept a list of requests 
> that need to be retried.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28694) Set pipeline.name to resource name by default for application deployments

2022-07-26 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-28694:
---
Summary: Set pipeline.name to resource name by default for application 
deployments  (was: Set pipeline.name to resource name by default for 
Applications)

> Set pipeline.name to resource name by default for application deployments
> -
>
> Key: FLINK-28694
> URL: https://issues.apache.org/jira/browse/FLINK-28694
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Minor
>
> I think it would be nice to set pipeline.name by default to the resource name 
> for application deployments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28694) Set pipeline.name to resource name by default for application deployments

2022-07-26 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571443#comment-17571443
 ] 

Gyula Fora commented on FLINK-28694:


[~wangyang0918] do you see any downsides?

> Set pipeline.name to resource name by default for application deployments
> -
>
> Key: FLINK-28694
> URL: https://issues.apache.org/jira/browse/FLINK-28694
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Minor
>
> I think it would be nice to set pipeline.name by default to the resource name 
> for application deployments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression

2022-07-26 Thread Hongbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongbo updated FLINK-28693:
---
Description: 
The following code will throw an exception:

 
{code:java}
Table program cannot be compiled. This is a bug. Please file an issue.
 ...
 Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 54: 
Cannot determine simple type name "org" {code}
{color:#00}Code:{color}
{code:java}
public class TestUdf extends  ScalarFunction {
@DataTypeHint("TIMESTAMP(3)")
public LocalDateTime eval(String strDate) {
   return LocalDateTime.now();
}
}

public class FlinkTest {
@Test
void testUdf() throws Exception {
//var env = StreamExecutionEnvironment.createLocalEnvironment();
// run `gradlew shadowJar` first to generate the uber jar.
// It contains the kafka connector and a dummy UDF function.

var env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081,
"build/libs/flink-test-all.jar");
env.setParallelism(1);
var tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class);

var testTable = tableEnv.from(TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("time_stamp", DataTypes.STRING())
.columnByExpression("udf_ts", "TEST_UDF(time_stamp)")
.watermark("udf_ts", "udf_ts - INTERVAL '1' second")
.build())
// the kafka server doesn't need to exist. It fails in the 
compile stage before fetching data.
.option("properties.bootstrap.servers", "localhost:9092")
.option("topic", "test_topic")
.option("format", "json")
.option("scan.startup.mode", "latest-offset")
.build());
testTable.printSchema();
tableEnv.createTemporaryView("test", testTable );

var query = tableEnv.sqlQuery("select * from test");
var tableResult = 
query.executeInsert(TableDescriptor.forConnector("print").build());
tableResult.await();
}
}{code}
What does the code do?{color:#cc7832}
{color}
 # read a stream from Kakfa
 # create a derived column using an UDF expression
 # define the watermark based on the derived column

The full callstack:

 
{code:java}
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62)
 ~[flink-table-runtime-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 [flink-dist-1.15.1.jar:1.15.1]at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
[flink-dist-1.15.1.jar:1.15.1]at 

  1   2   3   >