Re: [PR] [FLINK-34537] Autoscaler JDBC Support HikariPool [flink-kubernetes-operator]

2024-05-15 Thread via GitHub


1996fanrui commented on PR #785:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/785#issuecomment-2114192076

   > Thanks @czy006 for the contribution and @1996fanrui for the review. LGTM 
on the whole. Just left a few of minor comments. PATL if you had the free time. 
:)
   
   Thanks @RocMarshal for the quick review, I have updated all your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35358) Breaking change when loading artifacts

2024-05-15 Thread Rasmus Thygesen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Thygesen updated FLINK-35358:

Fix Version/s: 1.18.2

> Breaking change when loading artifacts
> --
>
> Key: FLINK-35358
> URL: https://issues.apache.org/jira/browse/FLINK-35358
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, flink-docker
>Affects Versions: 1.19.0
>Reporter: Rasmus Thygesen
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: 1.18.2
>
>
> We have been using the following code snippet in our Dockerfiles for running 
> a Flink job in application mode
>  
> {code:java}
> FROM flink:1.18.1-scala_2.12-java17
> COPY --from=build /app/target/my-job*.jar 
> /opt/flink/usrlib/artifacts/my-job.jar
> USER flink {code}
>  
> Which has been working since at least around Flink 1.14, but the 1.19 update 
> has broken our Dockerfiles. The fix is to put the jar file a step further out 
> so the code snippet becomes
>  
> {code:java}
> FROM flink:1.18.1-scala_2.12-java17
> COPY --from=build /app/target/my-job*.jar /opt/flink/usrlib/my-job.jar
> USER flink  {code}
>  
> We have not spent too much time looking into what the cause is, but we get 
> the stack trace
>  
> {code:java}
> myjob-jobmanager-1   | org.apache.flink.util.FlinkException: Could not load 
> the provided entrypoint class.
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:230)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.getPackagedProgram(StandaloneApplicationClusterEntryPoint.java:149)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.lambda$main$0(StandaloneApplicationClusterEntryPoint.java:90)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:89)
>  [flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   | Caused by: 
> org.apache.flink.client.program.ProgramInvocationException: The program's 
> entry point class 'my.company.job.MyJob' was not found in the jar file.
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:481)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:153)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:65)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:228)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     ... 4 more
> myjob-jobmanager-1   | Caused by: java.lang.ClassNotFoundException: 
> my.company.job.MyJob
> myjob-jobmanager-1   |     at java.net.URLClassLoader.findClass(Unknown 
> Source) ~[?:?]
> myjob-jobmanager-1   |     at java.lang.ClassLoader.loadClass(Unknown Source) 
> ~[?:?]
> myjob-jobmanager-1   |     at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at java.lang.ClassLoader.loadClass(Unknown Source) 
> ~[?:?]
> myjob-jobmanager-1   |     at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:197)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at java.lang.Class.forName0(Native Method) ~[?:?]
> myjob-jobmanager-1   |     at java.lang.Class.forName(Unknown Source) ~[?:?]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:479)
>  ~[flink-dist-1.19.0.jar:1.1

[jira] [Updated] (FLINK-35358) Breaking change when loading artifacts

2024-05-15 Thread Rasmus Thygesen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Thygesen updated FLINK-35358:

Fix Version/s: 1.19.1
   (was: 1.18.2)

> Breaking change when loading artifacts
> --
>
> Key: FLINK-35358
> URL: https://issues.apache.org/jira/browse/FLINK-35358
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, flink-docker
>Affects Versions: 1.19.0
>Reporter: Rasmus Thygesen
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> We have been using the following code snippet in our Dockerfiles for running 
> a Flink job in application mode
>  
> {code:java}
> FROM flink:1.18.1-scala_2.12-java17
> COPY --from=build /app/target/my-job*.jar 
> /opt/flink/usrlib/artifacts/my-job.jar
> USER flink {code}
>  
> Which has been working since at least around Flink 1.14, but the 1.19 update 
> has broken our Dockerfiles. The fix is to put the jar file a step further out 
> so the code snippet becomes
>  
> {code:java}
> FROM flink:1.18.1-scala_2.12-java17
> COPY --from=build /app/target/my-job*.jar /opt/flink/usrlib/my-job.jar
> USER flink  {code}
>  
> We have not spent too much time looking into what the cause is, but we get 
> the stack trace
>  
> {code:java}
> myjob-jobmanager-1   | org.apache.flink.util.FlinkException: Could not load 
> the provided entrypoint class.
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:230)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.getPackagedProgram(StandaloneApplicationClusterEntryPoint.java:149)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.lambda$main$0(StandaloneApplicationClusterEntryPoint.java:90)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:89)
>  [flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   | Caused by: 
> org.apache.flink.client.program.ProgramInvocationException: The program's 
> entry point class 'my.company.job.MyJob' was not found in the jar file.
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:481)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:153)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:65)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:228)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     ... 4 more
> myjob-jobmanager-1   | Caused by: java.lang.ClassNotFoundException: 
> my.company.job.MyJob
> myjob-jobmanager-1   |     at java.net.URLClassLoader.findClass(Unknown 
> Source) ~[?:?]
> myjob-jobmanager-1   |     at java.lang.ClassLoader.loadClass(Unknown Source) 
> ~[?:?]
> myjob-jobmanager-1   |     at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at java.lang.ClassLoader.loadClass(Unknown Source) 
> ~[?:?]
> myjob-jobmanager-1   |     at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:197)
>  ~[flink-dist-1.19.0.jar:1.19.0]
> myjob-jobmanager-1   |     at java.lang.Class.forName0(Native Method) ~[?:?]
> myjob-jobmanager-1   |     at java.lang.Class.forName(Unknown Source) ~[?:?]
> myjob-jobmanager-1   |     at 
> org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.jav

Re: [PR] [FLINK-34537] Autoscaler JDBC Support HikariPool [flink-kubernetes-operator]

2024-05-15 Thread via GitHub


RocMarshal commented on code in PR #785:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/785#discussion_r1602676542


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.utils;
+
+import org.apache.flink.configuration.Configuration;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Hikari JDBC common util. */
+public class HikariJDBCUtil {
+
+public static Connection getConnection(Configuration conf) throws 
SQLException {
+final var jdbcUrl = conf.get(JDBC_URL);
+checkArgument(
+jdbcUrl != null,
+"%s is required when jdbc state store or jdbc event handler is 
used.",

Review Comment:
   what about extracting it as a static final constant for fine-test & 
reference ?
   eg:
   ```
   public static final String JDBC_URL_REQUIRED_HINT_FORMAT = "%s is required 
when jdbc state store or jdbc event handler is used.";
   ```
   If so, we could use the constant in the corresponding test cases. `anchor A1 
& anchor A2`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34537] Autoscaler JDBC Support HikariPool [flink-kubernetes-operator]

2024-05-15 Thread via GitHub


RocMarshal commented on code in PR #785:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/785#discussion_r1602673972


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.utils;
+
+import org.apache.flink.configuration.Configuration;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Hikari JDBC common util. */
+public class HikariJDBCUtil {
+
+public static Connection getConnection(Configuration conf) throws 
SQLException {
+final var jdbcUrl = conf.get(JDBC_URL);
+checkArgument(
+jdbcUrl != null,

Review Comment:
   ```suggestion
  !StringUtils.isNullOrWhitespaceOnly(jdbcUrl),
   ```



##
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java:
##
@@ -58,25 +56,28 @@ void testCreateJdbcStateStoreWithoutURL() {
 conf.set(STATE_STORE_TYPE, JDBC);
 assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf))
 .isInstanceOf(IllegalArgumentException.class)
-.hasMessage("%s is required for jdbc state store.", 
JDBC_URL.key());
+.hasMessage(
+"%s is required when jdbc state store or jdbc event 
handler is used.",

Review Comment:
   anchor A2



##
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java:
##
@@ -59,25 +57,28 @@ void testCreateJdbcEventHandlerWithoutURL() {
 conf.set(EVENT_HANDLER_TYPE, JDBC);
 assertThatThrownBy(() -> AutoscalerEventHandlerFactory.create(conf))
 .isInstanceOf(IllegalArgumentException.class)
-.hasMessage("%s is required for jdbc event handler.", 
JDBC_URL.key());
+.hasMessage(
+"%s is required when jdbc state store or jdbc event 
handler is used.",

Review Comment:
   anchor A1



##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.utils;
+
+import org.apache.flink.configuration.Configuration;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
+import static org.apache.flink.util.Preconditio

Re: [PR] [FLINK-35030][runtime] Introduce Epoch Manager under async execution [flink]

2024-05-15 Thread via GitHub


fredia commented on code in PR #24748:
URL: https://github.com/apache/flink/pull/24748#discussion_r1602677598


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedList;
+
+/**
+ * Epoch manager segments inputs into distinct epochs, marked by the arrival 
of non-records(e.g.
+ * watermark, record attributes). Records are assigned to a unique epoch based 
on their arrival,
+ * records within an epoch are allowed to be parallelized, while the 
non-record of an epoch can only
+ * be executed when all records in this epoch have finished.
+ *
+ * For more details please refer to FLIP-425.
+ */
+public class EpochManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(EpochManager.class);
+
+/**
+ * This enum defines whether parallel execution between epochs is allowed. 
We should keep this
+ * internal and away from API module for now, until we could see the 
concrete need for {@link
+ * #PARALLEL_BETWEEN_EPOCH} from average users.
+ */
+public enum ParallelMode {
+/**
+ * Subsequent epochs must wait until the previous epoch is completed 
before they can start.
+ */
+SERIAL_BETWEEN_EPOCH,
+/**
+ * Subsequent epochs can begin execution even if the previous epoch 
has not yet completed.
+ * Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}.
+ */
+PARALLEL_BETWEEN_EPOCH
+}
+
+/**
+ * The reference to the {@link AsyncExecutionController}, used for {@link
+ * ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing.
+ */
+final AsyncExecutionController asyncExecutionController;
+
+/** The number of epochs that have arrived. */
+long epochNum;
+
+/** The output queue to hold ongoing epochs. */
+LinkedList outputQueue;
+
+/** Current active epoch, only one active epoch at the same time. */
+Epoch activeEpoch;
+
+public EpochManager(AsyncExecutionController aec) {
+this.epochNum = 0;
+this.outputQueue = new LinkedList<>();
+this.asyncExecutionController = aec;
+// init an empty epoch, the epoch action will be updated when 
non-record is received.
+initNewActiveEpoch();
+}
+
+/**
+ * Add a record to the current epoch and return the current open epoch, 
the epoch will be
+ * associated with the {@link RecordContext} of this record. Must be 
invoked within task thread.
+ *
+ * @return the current open epoch.
+ */
+public Epoch onRecord() {
+activeEpoch.ongoingRecordCount++;
+return activeEpoch;
+}
+
+/**
+ * Add a non-record to the current epoch, close current epoch and open a 
new epoch. Must be
+ * invoked within task thread.
+ *
+ * @param action the action associated with this non-record.
+ * @param parallelMode the parallel mode for this epoch.
+ */
+public void onNonRecord(Runnable action, ParallelMode parallelMode) {
+if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
+asyncExecutionController.drainInflightRecords(0);

Review Comment:
   > When doing drainInflightRecords, will the activeEpoch's ongoingRecordCount 
reached 0 and `completeOneRecord` called?
   
   yes, `completeOneRecord` would be called.
   I changed the logic to:  close `activeEpoch` first, then drain in-flight 
records. The open active epoch would not be updated when doing 
`drainInflightRecords`, and updating `closed` `epoch#ongoingRecordCount` is 
allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35167][cdc-connector] Introduce MaxCompute pipeline DataSink [flink-cdc]

2024-05-15 Thread via GitHub


loserwang1024 commented on PR #3254:
URL: https://github.com/apache/flink-cdc/pull/3254#issuecomment-2114099034

   I am wondering how a commercial database sink like MaxCompute to do e2e test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35356][State] State descriptor and implementation for async reducing state [flink]

2024-05-15 Thread via GitHub


masteryhx commented on code in PR #24798:
URL: https://github.com/apache/flink/pull/24798#discussion_r1602658277


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link StateDescriptor} for {@link 
org.apache.flink.api.common.state.v2.ReducingState}.
+ *
+ * @param  The type of the values that can be added to the state.
+ */
+public class ReducingStateDescriptor extends StateDescriptor {
+
+private final ReduceFunction reduceFunction;
+
+/**
+ * Creates a new {@code ReducingStateDescriptor} with the given name and 
default value.
+ *
+ * @param name The (unique) name for the state.
+ * @param reduceFunction The {@code ReduceFunction} used to aggregate the 
state.
+ * @param typeInfo The type of the values in the state.
+ */
+public ReducingStateDescriptor(
+String name, ReduceFunction reduceFunction, TypeInformation 
typeInfo) {
+super(name, typeInfo, null);

Review Comment:
   ```suggestion
   super(name, typeInfo);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35371) Allow the keystore and truststore type to configured for SSL

2024-05-15 Thread Ammar Master (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ammar Master updated FLINK-35371:
-
Description: 
Flink always creates a keystore and trustore using the [default 
type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
 defined in the JDK, which in most cases is JKS.

{code}

KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());

{code}

We should add other configuration options to set the type explicitly to support 
other custom formats, and match the options provided by other applications by 
[Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
 and 
[Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
 already. The default would continue to be specified by the JDK.

 

The SSLContext for the REST API can read the configuration option directly, and 
we need to add extra logic to the 
[CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
 for Pekko.

  was:
Flink always creates a keystore and trustore using the [default 
type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
 defined in the JDK, which in most cases is JKS. 

{{KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());}}


We should add other configuration options to set the type explicitly to support 
other custom formats, and match the options provided by other applications by 
[Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
 and 
[Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
 already. The default would continue to be specified by the JDK.

 

The SSLContext for the REST API can read the configuration option directly, and 
we need to add extra logic to the 
[CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
 for Pekko.


> Allow the keystore and truststore type to configured for SSL
> 
>
> Key: FLINK-35371
> URL: https://issues.apache.org/jira/browse/FLINK-35371
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Ammar Master
>Priority: Minor
>  Labels: SSL
>
> Flink always creates a keystore and trustore using the [default 
> type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
>  defined in the JDK, which in most cases is JKS.
> {code}
> KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
> {code}
> We should add other configuration options to set the type explicitly to 
> support other custom formats, and match the options provided by other 
> applications by 
> [Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
>  and 
> [Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
>  already. The default would continue to be specified by the JDK.
>  
> The SSLContext for the REST API can read the configuration option directly, 
> and we need to add extra logic to the 
> [CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
>  for Pekko.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35371) Allow the keystore and truststore type to configured for SSL

2024-05-15 Thread Ammar Master (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ammar Master updated FLINK-35371:
-
Description: 
Flink always creates a keystore and trustore using the [default 
type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
 defined in the JDK, which in most cases is JKS. 

{{KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());}}


We should add other configuration options to set the type explicitly to support 
other custom formats, and match the options provided by other applications by 
[Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
 and 
[Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
 already. The default would continue to be specified by the JDK.

 

The SSLContext for the REST API can read the configuration option directly, and 
we need to add extra logic to the 
[CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
 for Pekko.

  was:
Flink always creates a keystore and trustore using the [default 
type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
 defined in the JDK, which in most cases is JKS. We should add other 
configuration options to set the type explicitly to support other custom 
formats, and match the options provided by other applications by 
[Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
 and 
[Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
 already. The default would continue to be specified by the JDK.

 

The SSLContext for the REST API can read the configuration option directly, and 
we need to add extra logic to the 
[CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
 for Pekko.


> Allow the keystore and truststore type to configured for SSL
> 
>
> Key: FLINK-35371
> URL: https://issues.apache.org/jira/browse/FLINK-35371
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Ammar Master
>Priority: Minor
>  Labels: SSL
>
> Flink always creates a keystore and trustore using the [default 
> type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
>  defined in the JDK, which in most cases is JKS. 
> {{KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());}}
> We should add other configuration options to set the type explicitly to 
> support other custom formats, and match the options provided by other 
> applications by 
> [Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
>  and 
> [Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
>  already. The default would continue to be specified by the JDK.
>  
> The SSLContext for the REST API can read the configuration option directly, 
> and we need to add extra logic to the 
> [CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
>  for Pekko.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35371) Allow the keystore and truststore type to configured for SSL

2024-05-15 Thread Ammar Master (Jira)
Ammar Master created FLINK-35371:


 Summary: Allow the keystore and truststore type to configured for 
SSL
 Key: FLINK-35371
 URL: https://issues.apache.org/jira/browse/FLINK-35371
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Ammar Master


Flink always creates a keystore and trustore using the [default 
type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
 defined in the JDK, which in most cases is JKS. We should add other 
configuration options to set the type explicitly to support other custom 
formats, and match the options provided by other applications by 
[Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
 and 
[Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
 already. The default would continue to be specified by the JDK.

 

The SSLContext for the REST API can read the configuration option directly, and 
we need to add extra logic to the 
[CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
 for Pekko.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35356][State] State descriptor and implementation for async reducing state [flink]

2024-05-15 Thread via GitHub


flinkbot commented on PR #24798:
URL: https://github.com/apache/flink/pull/24798#issuecomment-2114037580

   
   ## CI report:
   
   * a8938c86450f0cf9cc878926e560630e7f063e2c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35356) Async reducing state

2024-05-15 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-35356:
---

Assignee: Zakelly Lan

> Async reducing state
> 
>
> Key: FLINK-35356
> URL: https://issues.apache.org/jira/browse/FLINK-35356
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35356) Async reducing state

2024-05-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35356:
---
Labels: pull-request-available  (was: )

> Async reducing state
> 
>
> Key: FLINK-35356
> URL: https://issues.apache.org/jira/browse/FLINK-35356
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35356][State] State descriptor and implementation for async reducing state [flink]

2024-05-15 Thread via GitHub


Zakelly opened a new pull request, #24798:
URL: https://github.com/apache/flink/pull/24798

   ## What is the purpose of the change
   
   This PR provides the definition of `StateDescriptor` and simple 
implementation for `ReducingState` in `State V2`.
   
   
   ## Brief change log
   
- Definition and tests for `ReducingState` and `ReducingStateDescriptor`
   
   
   ## Verifying this change
   
   Added UT for each new introduced class.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35221][hive] Support SQL 2011 reserved keywords as identifiers in HiveParser [flink-connector-hive]

2024-05-15 Thread via GitHub


reswqa commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-hive/pull/18#discussion_r1602581492


##
flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java:
##
@@ -30,4 +30,6 @@ public class HiveParserConstants {
 /* Constants for insert overwrite directory */
 public static final String IS_INSERT_DIRECTORY = "is-insert-directory";
 public static final String IS_TO_LOCAL_DIRECTORY = "is-to-local-directory";
+public static final String 
SUPPORT_SQL11_RESERVED_KEYWORDS_AS_IDENTIFIERS_CONFIG_NAME =

Review Comment:
   This name confused me indeed. It makes me think that this configuration 
option is used to control whether sql11 reserved keywords are allowed as 
identifier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35221][hive] Support SQL 2011 reserved keywords as identifiers in HiveParser [flink-connector-hive]

2024-05-15 Thread via GitHub


reswqa commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-hive/pull/18#discussion_r1602576000


##
flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java:
##
@@ -30,4 +30,6 @@ public class HiveParserConstants {
 /* Constants for insert overwrite directory */
 public static final String IS_INSERT_DIRECTORY = "is-insert-directory";
 public static final String IS_TO_LOCAL_DIRECTORY = "is-to-local-directory";
+public static final String 
SUPPORT_SQL11_RESERVED_KEYWORDS_AS_IDENTIFIERS_CONFIG_NAME =

Review Comment:
   ```suggestion
   public static final String 
SUPPORT_SQL11_RESERVED_KEYWORDS_AS_IDENTIFIERS =
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35370) Create a temp module to test backward compatibility

2024-05-15 Thread Jira
João Boto created FLINK-35370:
-

 Summary: Create a temp module to test backward compatibility
 Key: FLINK-35370
 URL: https://issues.apache.org/jira/browse/FLINK-35370
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / JDBC
Reporter: João Boto






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35221][hive] Support SQL 2011 reserved keywords as identifiers in HiveParser [flink-connector-hive]

2024-05-15 Thread via GitHub


reswqa commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-hive/pull/18#discussion_r1602571412


##
flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierTest.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.module.CoreModule;
+import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/** Test the compatibility with SQL11 reserved keywords in hive queries. */
+class HiveDialectSupportSQL11ReservedKeywordAsIdentifierTest {
+private static HiveCatalog hiveCatalog;
+private static TableEnvironment tableEnv;
+private static List sql11ReservedKeywords;
+
+@BeforeAll
+static void setup() throws Exception {
+hiveCatalog = HiveTestUtils.createHiveCatalog();
+hiveCatalog
+.getHiveConf()
+.setBoolean(
+HiveParserConstants
+
.SUPPORT_SQL11_RESERVED_KEYWORDS_AS_IDENTIFIERS_CONFIG_NAME,
+false);
+hiveCatalog.open();
+tableEnv = getTableEnvWithHiveCatalog();
+
tableEnv.getConfig().set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED,
 false);
+sql11ReservedKeywords =
+new ArrayList<>(
+Arrays.asList(

Review Comment:
   Why we need the outer `new ArrayList<>`?



##
flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java:
##
@@ -30,4 +30,6 @@ public class HiveParserConstants {
 /* Constants for insert overwrite directory */
 public static final String IS_INSERT_DIRECTORY = "is-insert-directory";
 public static final String IS_TO_LOCAL_DIRECTORY = "is-to-local-directory";
+public static final String 
SUPPORT_SQL11_RESERVED_KEYWORDS_AS_IDENTIFIERS_CONFIG_NAME =

Review Comment:
   ```suggestion
   public static final String 
SUPPORT_SQL11_RESERVED_KEYWORDS_AS_IDENTIFIERS =
   ```



##
flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierTest.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.module.CoreModule;
+import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
+import org.apache.fli

Re: [PR] [FLINK-35325][cdc-connector][paimon]Support for specifying column order. [flink-cdc]

2024-05-15 Thread via GitHub


melin commented on PR #3323:
URL: https://github.com/apache/flink-cdc/pull/3323#issuecomment-2113979566

   https://github.com/apache/paimon/issues/3316


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32082][docs] Documentation of checkpoint file-merging [flink]

2024-05-15 Thread via GitHub


Zakelly commented on code in PR #24766:
URL: https://github.com/apache/flink/pull/24766#discussion_r1602564828


##
docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md:
##
@@ -292,4 +292,26 @@ The final checkpoint would be triggered immediately after 
all operators have rea
 without waiting for periodic triggering, but the job will need to wait for 
this final checkpoint 
 to be completed.
 
+## Unify file merging mechanism for checkpoints (Experimental)
+
+The unified file merging mechanism for checkpointing is introduced to Flink 
1.20 as an MVP ("minimum viable product") feature, 
+which allows scattered small checkpoint files to be written into larger files, 
reducing the number of file creations 
+and file deletions, which alleviates the pressure of file system metadata 
management raised by the file flooding problem during checkpoints.
+The mechanism can be enabled by setting 
`state.checkpoints.file-merging.enabled` to `true`.
+**Note** that as a trade-off, enabling this mechanism may lead to space 
amplification, that is, the actual occupation on the file system
+will be larger than actual state size. 
`state.checkpoints.file-merging.max-space-amplification` 
+can be used to limit the upper bound of space amplification.
+
+This mechanism is applicable to keyed state, operator state and channel state 
in Flink. Merging at subtask level is 
+provided for shared scope state; Merging at TaskManager level is provided for 
private scope state. The maximum number of subtasks
+allowed to be written to a single file can be configured through the 
`state.checkpoints.file-merging.max-subtasks-per-file` option.
+
+This feature also supports merging files across checkpoints. To enable this, 
set
+`state.checkpoints.file-merging.across-checkpoint-boundary` to `true`.
+
+This mechanism introduces a file pool to handle concurrent writing scenarios. 
There are two modes, the non-blocking mode will 
+always provide usable physical file without blocking when receive a file 
request, it may create many physical files if poll 
+file frequently; while the blocking mode will be blocked until there are 
returned files available in the file pool. This can be configured via 
+`state.checkpoints.file-merging.pool-blocking`.

Review Comment:
   ```suggestion
   file frequently; while the blocking mode will be blocked until there are 
returned files available in the file pool. This can be configured via
   setting `state.checkpoints.file-merging.pool-blocking` as `true` for 
blocking or `false` for non-blocking.
   ```



##
docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md:
##
@@ -250,5 +250,20 @@ StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironm
 需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。
 极端情况下,如果 checkpoint 的周期被设置为 `Long.MAX_VALUE`,那么任务永远不会结束,因为下一次 checkpoint 不会进行。
 
+##  统一的 checkpoint 文件合并机制 (实验性功能)
+
+Flink 1.20 引入了 MVP 版本的统一 checkpoint 文件合并机制,该机制允许把分散的 checkpoint 小文件合并到大文件中,减少 
checkpoint 文件创建删除的次数,
+有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `state.checkpoints.file-merging.enabled` 设置为 
`true` 来开启该机制。
+**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 
`state.checkpoints.file-merging.max-space-amplification`
+来控制文件放大的上限。
+
+该机制适用于 Flink 中的 keyed state、operator state 和 channel state。对 shared scope 
state 
+提供 subtask 级别的合并;对 private scope state 提供 TaskManager 级别的合并,可以通过
+ `state.checkpoints.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 
subtask 数目。
+
+统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 
`state.checkpoints.file-merging.across-checkpoint-boundary` 为 `true` 开启。
+该机制引入了文件池用于处理并发写的场景,文件池有两种模式,Non-blocking 
模式的文件池会对每个文件请求即时返回一个物理文件,在频繁请求的情况下会创建出许多物理文件;而

Review Comment:
   Add an empty line above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-32088) Re-uploading in state file-merging for space amplification control

2024-05-15 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-32088:
---

Assignee: Zakelly Lan  (was: Han Yin)

> Re-uploading in state file-merging for space amplification control
> --
>
> Key: FLINK-32088
> URL: https://issues.apache.org/jira/browse/FLINK-32088
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [minor][cdc][docs] Optimize markdown formats in doris quickstart doc [flink-cdc]

2024-05-15 Thread via GitHub


Jiabao-Sun opened a new pull request, #3324:
URL: https://github.com/apache/flink-cdc/pull/3324

   
![image](https://github.com/apache/flink-cdc/assets/27403841/ffc84541-45f8-416a-a934-bcec1b9cabbc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32083) Chinese translation of documentation of checkpoint file-merging

2024-05-15 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan closed FLINK-32083.
---
Resolution: Duplicate

> Chinese translation of documentation of checkpoint file-merging
> ---
>
> Key: FLINK-32083
> URL: https://issues.apache.org/jira/browse/FLINK-32083
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Hangxiang Yu
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35325) Paimon connector miss the position of AddColumnEvent

2024-05-15 Thread tianzhu.wen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846791#comment-17846791
 ] 

tianzhu.wen commented on FLINK-35325:
-

pr refer to: https://github.com/apache/flink-cdc/pull/3323

> Paimon connector miss the position of AddColumnEvent
> 
>
> Key: FLINK-35325
> URL: https://issues.apache.org/jira/browse/FLINK-35325
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, new columns are always added in the last position, however some 
> newly add columns had a specific before and after relationship with other 
> column.
> Source code:
> [https://github.com/apache/flink-cdc/blob/fa6e7ea51258dcd90f06036196618224156df367/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java#L137]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-35325) Paimon connector miss the position of AddColumnEvent

2024-05-15 Thread tianzhu.wen (Jira)


[ https://issues.apache.org/jira/browse/FLINK-35325 ]


tianzhu.wen deleted comment on FLINK-35325:
-

was (Author: JIRAUSER305385):
pr refer to: https://github.com/apache/flink-cdc/pull/3323

> Paimon connector miss the position of AddColumnEvent
> 
>
> Key: FLINK-35325
> URL: https://issues.apache.org/jira/browse/FLINK-35325
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, new columns are always added in the last position, however some 
> newly add columns had a specific before and after relationship with other 
> column.
> Source code:
> [https://github.com/apache/flink-cdc/blob/fa6e7ea51258dcd90f06036196618224156df367/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java#L137]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35325][cdc-connector][paimon]Support for specifying column order. [flink-cdc]

2024-05-15 Thread via GitHub


joyCurry30 opened a new pull request, #3323:
URL: https://github.com/apache/flink-cdc/pull/3323

   Refer to jira: https://issues.apache.org/jira/browse/FLINK-35325
   
   Adding support for specifying column order when adding new columns to a 
table.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35325) Paimon connector miss the position of AddColumnEvent

2024-05-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35325:
---
Labels: pull-request-available  (was: )

> Paimon connector miss the position of AddColumnEvent
> 
>
> Key: FLINK-35325
> URL: https://issues.apache.org/jira/browse/FLINK-35325
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, new columns are always added in the last position, however some 
> newly add columns had a specific before and after relationship with other 
> column.
> Source code:
> [https://github.com/apache/flink-cdc/blob/fa6e7ea51258dcd90f06036196618224156df367/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java#L137]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35353][docs-zh]Translate "Profiler" page into Chinese [flink]

2024-05-15 Thread via GitHub


Zakelly commented on PR #24793:
URL: https://github.com/apache/flink/pull/24793#issuecomment-2113919880

   @drymatini I think your commits include the wrong files by mistake, maybe 
you amend commit my previous one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table [flink]

2024-05-15 Thread via GitHub


hackergin commented on PR #24767:
URL: https://github.com/apache/flink/pull/24767#issuecomment-2113912527

   @lsyldliu  Thanks for the updates and explanations,  LGTM. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35226][rest] Deprecate execution-mode in ExecutionConfigInfo related rest api [flink]

2024-05-15 Thread via GitHub


flinkbot commented on PR #24797:
URL: https://github.com/apache/flink/pull/24797#issuecomment-2113909464

   
   ## CI report:
   
   * 2b52d0c6115bda908bb717fbaedca29efde7e52c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35225][Web] Remove Execution mode in Flink WebUI [flink]

2024-05-15 Thread via GitHub


flinkbot commented on PR #24796:
URL: https://github.com/apache/flink/pull/24796#issuecomment-2113905453

   
   ## CI report:
   
   * 822fd38cfcecd1061ca4014c4461ace561303000 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35226) Deprecate execution-mode in ExecutionConfigInfo related rest api

2024-05-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35226:
---
Labels: pull-request-available  (was: )

> Deprecate execution-mode in ExecutionConfigInfo related rest api
> 
>
> Key: FLINK-35226
> URL: https://issues.apache.org/jira/browse/FLINK-35226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35226][rest] Deprecate execution-mode in ExecutionConfigInfo related rest api [flink]

2024-05-15 Thread via GitHub


1996fanrui opened a new pull request, #24797:
URL: https://github.com/apache/flink/pull/24797

   ## What is the purpose of the change
   
   See 
[FLIP-441](https://cwiki.apache.org/confluence/display/FLINK/FLIP-441%3A+Show+the+JobType+and+remove+Execution+Mode+on+Flink+WebUI)
   
   
   ## Brief change log
   
   - [FLINK-35226][rest] Deprecate execution-mode in ExecutionConfigInfo 
related rest api
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:31 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    

@Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    

}    

public static DataStream domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"

[jira] [Updated] (FLINK-35225) Remove Execution mode in Flink WebUI

2024-05-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35225:
---
Labels: pull-request-available  (was: )

> Remove Execution mode in Flink WebUI
> 
>
> Key: FLINK-35225
> URL: https://issues.apache.org/jira/browse/FLINK-35225
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:29 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    

}    

public static DataStream domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"))

[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:28 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    }    

public static DataStream domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));

[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:27 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    }    

public static DataStream domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));

Re: [PR] [FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table [flink]

2024-05-15 Thread via GitHub


lsyldliu commented on code in PR #24767:
URL: https://github.com/apache/flink/pull/24767#discussion_r1602508279


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.workflow.WorkflowScheduler;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import static 
org.apache.flink.table.factories.FactoryUtil.WORKFLOW_SCHEDULER_TYPE;
+import static org.apache.flink.table.factories.FactoryUtil.stringifyOption;
+
+/** Utility for working with {@link WorkflowScheduler}. */
+@PublicEvolving
+public class WorkflowSchedulerFactoryUtil {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(WorkflowSchedulerFactoryUtil.class);
+
+public static final String WORKFLOW_SCHEDULER_PREFIX = 
"workflow-scheduler";
+
+private WorkflowSchedulerFactoryUtil() {
+// no instantiation
+}
+
+/**
+ * Attempts to discover the appropriate workflow scheduler factory and 
creates the instance of
+ * the scheduler. Return null directly if doesn't specify the workflow 
scheduler in config
+ * because it is optional for materialized table.
+ */
+public static @Nullable WorkflowScheduler createWorkflowScheduler(
+Configuration configuration, ClassLoader classLoader) {

Review Comment:
   No, MaterializedTableManager will be created when open a session, it will be 
as a member of SessionState, we can only get the `Configuration` on construct 
SessionState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:22 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    }    public static 
DataStream domainStream(StreamExecutionEnvironment env) {        /* Not 
assigning watermarks as program is being run in batch mode and watermarks are 
irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    }    private static List 
getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));
   

[jira] [Commented] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya commented on FLINK-35289:
---

package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class BatchJobTest2 {

    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }

    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));


        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

        env.setRuntimeMode(RuntimeExecutionMode.BATCH);


        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs;

                    @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }

                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);

                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });

        env.execute("FileReadJob");

    }

    public static DataStream domainStream(StreamExecutionEnvironment 
env) {

        /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");

    }

    private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));
        data.add(new Domain("A21", 

[jira] [Commented] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown

2024-05-15 Thread linshangquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846785#comment-17846785
 ] 

linshangquan commented on FLINK-35318:
--

[https://github.com/apache/flink/pull/24787]

Hi [~qingyue] , Could you possibly review this PR. Thank you very much!

> incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during 
> predicate pushdown
> -
>
> Key: FLINK-35318
> URL: https://issues.apache.org/jira/browse/FLINK-35318
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.1
> Environment: flink version 1.18.1
> iceberg version 1.15.1
>Reporter: linshangquan
>Assignee: linshangquan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-05-09-14-06-58-007.png, 
> image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, 
> image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, 
> image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png
>
>
> In our scenario, we have an Iceberg table that contains a column named 'time' 
> of the {{timestamptz}} data type. This column has 10 rows of data where the 
> 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" 
> timezone.
> !image-2024-05-09-14-06-58-007.png!
>  
> We encountered a strange phenomenon when accessing the table using 
> Iceberg-flink.
> When the {{WHERE}} clause includes the {{time}} column, the results are 
> incorrect.
> ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" 
> !image-2024-05-09-18-52-03-741.png!
> When there is no {{WHERE}} clause, the results are correct.
> !image-2024-05-09-18-52-28-584.png!
> During debugging, we found that when a {{WHERE}} clause is present, a 
> {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes 
> {{RexNodeToExpressionConverter}} for translation.
> !image-2024-05-09-14-11-38-476.png!
> !image-2024-05-09-14-22-59-370.png!
> When {{RexNodeToExpressionConverter#visitLiteral}} encounters a 
> {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone 
> "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} 
> type. However, the upstream {{TimestampString}} data has already been 
> processed in UTC timezone. By applying the local timezone processing here, an 
> error occurs due to the mismatch in timezones.
> Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in 
> {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should 
> process the data in UTC timezone.
>  
> Please help confirm if this is the issue, and if so, we can submit a patch to 
> fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table [flink]

2024-05-15 Thread via GitHub


lsyldliu commented on code in PR #24767:
URL: https://github.com/apache/flink/pull/24767#discussion_r1602497989


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.workflow;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.refresh.RefreshHandler;
+import org.apache.flink.table.refresh.RefreshHandlerSerializer;
+
+/**
+ * This interface is used to interact with specific workflow scheduler 
services that support
+ * creating, modifying, and deleting refreshed workflow of Materialized Table.
+ *
+ * @param  The type of {@link RefreshHandler} used by specific {@link 
WorkflowScheduler} to
+ * locate the refresh workflow in scheduler service.
+ */
+@PublicEvolving
+public interface WorkflowScheduler {
+
+/**
+ * Open this workflow scheduler instance. Used for any required 
preparation in initialization
+ * phase.
+ *
+ * @throws WorkflowException if initializing workflow scheduler occur 
exception
+ */
+void open() throws WorkflowException;
+
+/**
+ * Close this workflow scheduler when it is no longer needed and release 
any resource that it
+ * might be holding.
+ *
+ * @throws WorkflowException if close the related resources of workflow 
scheduler failed
+ */
+void close() throws WorkflowException;
+
+/**
+ * Return a {@link RefreshHandlerSerializer} instance to serialize and 
deserialize {@link
+ * RefreshHandler} created by specific workflow scheduler service.
+ */
+RefreshHandlerSerializer getRefreshHandlerSerializer();
+
+/**
+ * Create a refresh workflow in specific scheduler service for the 
materialized table, return a
+ * {@link RefreshHandler} instance which can locate the refresh workflow 
detail information.
+ *
+ * This method supports creating workflow for periodic refresh, as well 
as workflow for a
+ * one-time refresh only.
+ *
+ * @param createRefreshWorkflow The detail info for create refresh 
workflow of materialized
+ * table.
+ * @return The meta info which points to the refresh workflow in scheduler 
service.
+ * @throws WorkflowException if create refresh workflow failed

Review Comment:
   I think it is no need, you can the related interface methods of Catalog, it 
doesn't thrown the  unsupported exception in method signature even though some 
operations unsupported. Moreover, we have introduced the WorkflowException for 
WorkflowScheduler, I think we should try to wrap the exception to 
WorkflowExeception, it is checked Exception, user should sense it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32092) Integrate snapshot file-merging with existing IT cases

2024-05-15 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846784#comment-17846784
 ] 

Yanfei Lei commented on FLINK-32092:


Merged into master via b87ead7

> Integrate snapshot file-merging with existing IT cases
> --
>
> Key: FLINK-32092
> URL: https://issues.apache.org/jira/browse/FLINK-32092
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32092) Integrate snapshot file-merging with existing IT cases

2024-05-15 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-32092.

Resolution: Resolved

> Integrate snapshot file-merging with existing IT cases
> --
>
> Key: FLINK-32092
> URL: https://issues.apache.org/jira/browse/FLINK-32092
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32092][tests] Integrate snapshot file-merging with existing ITCases [flink]

2024-05-15 Thread via GitHub


fredia merged PR #24789:
URL: https://github.com/apache/flink/pull/24789


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35325) Paimon connector miss the position of AddColumnEvent

2024-05-15 Thread tianzhu.wen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846782#comment-17846782
 ] 

tianzhu.wen commented on FLINK-35325:
-

I'd like to fix it, please assign it to me, thanks.

> Paimon connector miss the position of AddColumnEvent
> 
>
> Key: FLINK-35325
> URL: https://issues.apache.org/jira/browse/FLINK-35325
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Priority: Minor
> Fix For: cdc-3.2.0
>
>
> Currently, new columns are always added in the last position, however some 
> newly add columns had a specific before and after relationship with other 
> column.
> Source code:
> [https://github.com/apache/flink-cdc/blob/fa6e7ea51258dcd90f06036196618224156df367/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java#L137]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846626#comment-17846626
 ] 

Shuai Xu edited comment on FLINK-34380 at 5/16/24 1:58 AM:
---

Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guarantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guaranteed on a node by using 
LinkedHashMap, however, it could not be guaranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.


was (Author: JIRAUSER300096):
Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guanrantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guanranteed on a node by using 
LinkedHashMap, however, it could not be guranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33109) Watermark alignment not applied after recovery from checkpoint

2024-05-15 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846777#comment-17846777
 ] 

Rui Fan commented on FLINK-33109:
-

Thanks [~YordanPavlov] for the feedback, let me close this JIRA.

> Watermark alignment not applied after recovery from checkpoint
> --
>
> Key: FLINK-33109
> URL: https://issues.apache.org/jira/browse/FLINK-33109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Yordan Pavlov
>Priority: Major
> Attachments: WatermarkTest-1.scala, 
> image-2023-09-18-15-40-06-868.png, image-2023-09-18-15-46-16-106.png
>
>
> I am observing a problem where after recovery from a checkpoint the Kafka 
> source watermarks would start to diverge not honoring the watermark alignment 
> setting I have applied.
> I have a Kafka source which reads a topic with 32 partitions. I am applying 
> the following watermark strategy:
> {code:java}
> new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => 
> msg.value.getTimestamp)
>       .withWatermarkAlignment("alignment-sources-group", 
> time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
>  
> This works great up until my job needs to recover from checkpoint. Once the 
> recovery takes place, no alignment is taking place any more. This can best be 
> illustrated by looking at the watermark metrics for various operators in the 
> image:
> !image-2023-09-18-15-40-06-868.png!
>  
> You can see how the watermarks disperse after the recovery. Trying to debug 
> the problem I noticed that before the failure there would be calls in
>  
> {code:java}
> SourceCoordinator::announceCombinedWatermark() 
> {code}
> after the recovery, no calls get there, so no value for 
> {code:java}
> watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
> is ever read. I can manually fix the problem If I stop the job, clear all 
> state from Zookeeper and then manually start Flink providing the last 
> checkpoint with 
> {code:java}
> '–fromSavepoint'{code}
>  flag. This would cause the SourceCoordinator to be constructed properly and 
> watermark drift to be checked. Once recovery manually watermarks would again 
> converge to the allowed drift as seen in the metrics:
> !image-2023-09-18-15-46-16-106.png!
>  
> Let me know If I can be helpful by providing any more information.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33109) Watermark alignment not applied after recovery from checkpoint

2024-05-15 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan closed FLINK-33109.
---
Resolution: Not A Problem

> Watermark alignment not applied after recovery from checkpoint
> --
>
> Key: FLINK-33109
> URL: https://issues.apache.org/jira/browse/FLINK-33109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Yordan Pavlov
>Priority: Major
> Attachments: WatermarkTest-1.scala, 
> image-2023-09-18-15-40-06-868.png, image-2023-09-18-15-46-16-106.png
>
>
> I am observing a problem where after recovery from a checkpoint the Kafka 
> source watermarks would start to diverge not honoring the watermark alignment 
> setting I have applied.
> I have a Kafka source which reads a topic with 32 partitions. I am applying 
> the following watermark strategy:
> {code:java}
> new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => 
> msg.value.getTimestamp)
>       .withWatermarkAlignment("alignment-sources-group", 
> time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
>  
> This works great up until my job needs to recover from checkpoint. Once the 
> recovery takes place, no alignment is taking place any more. This can best be 
> illustrated by looking at the watermark metrics for various operators in the 
> image:
> !image-2023-09-18-15-40-06-868.png!
>  
> You can see how the watermarks disperse after the recovery. Trying to debug 
> the problem I noticed that before the failure there would be calls in
>  
> {code:java}
> SourceCoordinator::announceCombinedWatermark() 
> {code}
> after the recovery, no calls get there, so no value for 
> {code:java}
> watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
> is ever read. I can manually fix the problem If I stop the job, clear all 
> state from Zookeeper and then manually start Flink providing the last 
> checkpoint with 
> {code:java}
> '–fromSavepoint'{code}
>  flag. This would cause the SourceCoordinator to be constructed properly and 
> watermark drift to be checked. Once recovery manually watermarks would again 
> converge to the allowed drift as seen in the metrics:
> !image-2023-09-18-15-46-16-106.png!
>  
> Let me know If I can be helpful by providing any more information.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row/map/array type [flink]

2024-05-15 Thread via GitHub


flinkbot commented on PR #24795:
URL: https://github.com/apache/flink/pull/24795#issuecomment-2113596687

   
   ## CI report:
   
   * c3b588c4a44d135ccebe6acafa75e3b1a9aa4895 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] flink parquet writer support write nested array or map type [flink]

2024-05-15 Thread via GitHub


ViktorCosenza commented on PR #23881:
URL: https://github.com/apache/flink/pull/23881#issuecomment-2113594484

   > Hi @LoveHeat, can you cherry-pick the change in #24029 from @ukby1234 and 
merge the two PRs?
   
   @xccui, i've created a PR cherry picking both commits: 
https://github.com/apache/flink/pull/24795


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Flink 33759 [flink]

2024-05-15 Thread via GitHub


ViktorCosenza opened a new pull request, #24795:
URL: https://github.com/apache/flink/pull/24795

   
   
   ## What is the purpose of the change
   This Pull request adds commits from 
https://github.com/apache/flink/pull/24029 and 
https://github.com/apache/flink/pull/23881 to fully resolve issue 
https://issues.apache.org/jira/browse/FLINK-33759
   
   ## Brief change log
   Adds support for ParquetWriter nested Array, Map and Row Types. Previously 
the writer just wrote nothing. Silently failing.
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change added tests and can be verified as follows:
   
   - Added test to parquet writer using compression with all nested possible 
types.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): don't think 
so, previously the writer would instantly return for these cases but it would 
write nothing.
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no, it's a fix
 - If yes, how is the feature documented? na
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34379][table] Fix adding catalogtable logic [flink]

2024-05-15 Thread via GitHub


jeyhunkarimov commented on code in PR #24788:
URL: https://github.com/apache/flink/pull/24788#discussion_r1602288457


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##
@@ -234,20 +234,16 @@ private static boolean isSuitableFilter(RexNode 
filterCondition) {
 }
 
 private void setTables(ContextResolvedTable catalogTable) {
-if (tables.size() == 0) {
-tables.add(catalogTable);
-} else {
-boolean hasAdded = false;
-for (ContextResolvedTable thisTable : new ArrayList<>(tables)) 
{
-if (hasAdded) {
-break;
-}
-if 
(!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) {
-tables.add(catalogTable);
-hasAdded = true;
-}
+boolean alreadyExists = false;
+for (ContextResolvedTable table : tables) {

Review Comment:
   Good point. But note that `tables` is of type `Set` already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Adding Opensearch Connector v2.0.0 [flink-web]

2024-05-15 Thread via GitHub


snuyanzin opened a new pull request, #741:
URL: https://github.com/apache/flink-web/pull/741

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846759#comment-17846759
 ] 

Jeyhun Karimov commented on FLINK-35289:


 
{code:java}
carData.assignTimestampsAndWatermarks(
WatermarkStrategy
.>
forMonotonousTimestamps()
.withTimestampAssigner((car, ts) -> car.f0))
.keyBy(value -> value.f0)
.process(new KeyedProcessFunction, Tuple4>() {
@Override
public void processElement(
Tuple4 value,
KeyedProcessFunction, Tuple4>.Context ctx,
Collector> out) 
throws Exception {


ctx.timerService().registerProcessingTimeTimer(Long.MAX_VALUE);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector> out) throws Exception {
out.collect(new Tuple4<>(1,1,1.0,1L));
}
})
.process(new ProcessFunction, 
Tuple4>() {
@Override
public void processElement(
Tuple4 value,
ProcessFunction, 
Tuple4>.Context ctx,
Collector> out) 
throws Exception {
LOG(ctx.timestamp());
}
}){code}
 

If this is the example you mean, then the second process function does not 
output timestamp of Long.MAX_VALUE. 

 

If you have sth else in mind, then please provide minimum reproducible example. 
Thanks

 

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35369) Improve `Table API and SQL` overview page or add new page to guide new users to right Flink SQL option

2024-05-15 Thread Keith Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Lee updated FLINK-35369:
--
Description: 
Flink has rich and varied SQL offerings/deployment mode, it can take some time 
for new users to investigate and arrive at the right offering for them. 
Consider the available options:

1. Flink SQL Client (through SQL gateway, embedded or remote)
2. REST through SQL Gateway
3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
interface)
5. Compile and submit code that uses Table API through Flink Client 
(Java/Scala/Python)
6. Submitting packaged archive with code that uses Table API to JobManager REST 
endpoint 

(Additionally, Apache Zeppelin also provide notebook experience with its Flink 
SQL interpreter which builds upon Flink Client.)

The improvement being suggested here is to either enrich existing [Table API 
and SQL overview 
page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
 or create new page that contains the following information:

1. Diagram on the various options available (see diagram below)
2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
with Hive etc. The table will guide users to the corresponding page for each 
option.  !LandscapeOfFlinkSQL.drawio(6).png!

  was:
Flink has rich and varied SQL offerings/deployment mode, it can take some time 
for new users to investigate and arrive at the right offering for them. 
Consider the available options:

1. Flink SQL Client (through SQL gateway, embedded or remote)
2. REST through SQL Gateway
3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
interface)
5. Compile and submit through Flink Client (Java/Scala/Python)
6. Submitting packaged archive with code that uses Table API to JobManager REST 
endpoint 

(Additionally, Apache Zeppelin also provide notebook experience with its Flink 
SQL interpreter which builds upon Flink Client.)

The improvement being suggested here is to either enrich existing [Table API 
and SQL overview 
page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
 or create new page that contains the following information:

1. Diagram on the various options available (see diagram below)
2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
with Hive etc. The table will guide users to the corresponding page for each 
option.  !LandscapeOfFlinkSQL.drawio(6).png!


> Improve `Table API and SQL` overview page or add new page to guide new users 
> to right Flink SQL option
> --
>
> Key: FLINK-35369
> URL: https://issues.apache.org/jira/browse/FLINK-35369
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Affects Versions: 1.19.0
>Reporter: Keith Lee
>Priority: Major
> Attachments: LandscapeOfFlinkSQL.drawio(6).png
>
>
> Flink has rich and varied SQL offerings/deployment mode, it can take some 
> time for new users to investigate and arrive at the right offering for them. 
> Consider the available options:
> 1. Flink SQL Client (through SQL gateway, embedded or remote)
> 2. REST through SQL Gateway
> 3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
> 4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
> interface)
> 5. Compile and submit code that uses Table API through Flink Client 
> (Java/Scala/Python)
> 6. Submitting packaged archive with code that uses Table API to JobManager 
> REST endpoint 
> (Additionally, Apache Zeppelin also provide notebook experience with its 
> Flink SQL interpreter which builds upon Flink Client.)
> The improvement being suggested here is to either enrich existing [Table API 
> and SQL overview 
> page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
>  or create new page that contains the following information:
> 1. Diagram on the various options available (see diagram below)
> 2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
> experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
> client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
> with Hive etc. The table will guide users to the corresponding pa

[jira] [Updated] (FLINK-35369) Improve `Table API and SQL` overview page or add new page to guide new users to right Flink SQL option

2024-05-15 Thread Keith Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Lee updated FLINK-35369:
--
Summary: Improve `Table API and SQL` overview page or add new page to guide 
new users to right Flink SQL option  (was: Improve `Table API and SQL` page or 
add new page to guide new users to right Flink SQL option)

> Improve `Table API and SQL` overview page or add new page to guide new users 
> to right Flink SQL option
> --
>
> Key: FLINK-35369
> URL: https://issues.apache.org/jira/browse/FLINK-35369
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Affects Versions: 1.19.0
>Reporter: Keith Lee
>Priority: Major
> Attachments: LandscapeOfFlinkSQL.drawio(6).png
>
>
> Flink has rich and varied SQL offerings/deployment mode, it can take some 
> time for new users to investigate and arrive at the right offering for them. 
> Consider the available options:
> 1. Flink SQL Client (through SQL gateway, embedded or remote)
> 2. REST through SQL Gateway
> 3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
> 4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
> interface)
> 5. Compile and submit through Flink Client (Java/Scala/Python)
> 6. Submitting packaged archive with code that uses Table API to JobManager 
> REST endpoint 
> (Additionally, Apache Zeppelin also provide notebook experience with its 
> Flink SQL interpreter which builds upon Flink Client.)
> The improvement being suggested here is to either enrich existing [Table API 
> and SQL overview 
> page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
>  or create new page that contains the following information:
> 1. Diagram on the various options available (see diagram below)
> 2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
> experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
> client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
> with Hive etc. The table will guide users to the corresponding page for each 
> option.  !LandscapeOfFlinkSQL.drawio(6).png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35369) Improve `Table API and SQL` page or add new page to guide new users to right Flink SQL option

2024-05-15 Thread Keith Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Lee updated FLINK-35369:
--
Description: 
Flink has rich and varied SQL offerings/deployment mode, it can take some time 
for new users to investigate and arrive at the right offering for them. 
Consider the available options:

1. Flink SQL Client (through SQL gateway, embedded or remote)
2. REST through SQL Gateway
3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
interface)
5. Compile and submit through Flink Client (Java/Scala/Python)
6. Submitting packaged archive with code that uses Table API to JobManager REST 
endpoint 

(Additionally, Apache Zeppelin also provide notebook experience with its Flink 
SQL interpreter which builds upon Flink Client.)

The improvement being suggested here is to either enrich existing [Table API 
and SQL overview 
page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
 or create new page that contains the following information:

1. Diagram on the various options available (see diagram below)
2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
with Hive etc. The table will guide users to the corresponding page for each 
option.  !LandscapeOfFlinkSQL.drawio(6).png!

  was:
Flink has rich and varied SQL offerings/deployment mode, it can take some time 
for new users to investigate and arrive at the right offering for them. 
Consider the available options:

1. Flink SQL Client (through SQL gateway, embedded or remote)
2. REST through SQL Gateway
3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
interface)
5. Flink Client submitting packaged application (Java/Scala/Python)
6. Submitting packaged archive with code that uses Table API to JobManager REST 
endpoint 

(Additionally, Apache Zeppelin also provide notebook experience with its Flink 
SQL interpreter which builds upon Flink Client.)

The improvement being suggested here is to either enrich existing [Table API 
and SQL overview 
page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
 or create new page that contains the following information:

1. Diagram on the various options available (see diagram below)
2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
with Hive etc. The table will guide users to the corresponding page for each 
option.  !LandscapeOfFlinkSQL.drawio(6).png!


> Improve `Table API and SQL` page or add new page to guide new users to right 
> Flink SQL option
> -
>
> Key: FLINK-35369
> URL: https://issues.apache.org/jira/browse/FLINK-35369
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Affects Versions: 1.19.0
>Reporter: Keith Lee
>Priority: Major
> Attachments: LandscapeOfFlinkSQL.drawio(6).png
>
>
> Flink has rich and varied SQL offerings/deployment mode, it can take some 
> time for new users to investigate and arrive at the right offering for them. 
> Consider the available options:
> 1. Flink SQL Client (through SQL gateway, embedded or remote)
> 2. REST through SQL Gateway
> 3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
> 4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
> interface)
> 5. Compile and submit through Flink Client (Java/Scala/Python)
> 6. Submitting packaged archive with code that uses Table API to JobManager 
> REST endpoint 
> (Additionally, Apache Zeppelin also provide notebook experience with its 
> Flink SQL interpreter which builds upon Flink Client.)
> The improvement being suggested here is to either enrich existing [Table API 
> and SQL overview 
> page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
>  or create new page that contains the following information:
> 1. Diagram on the various options available (see diagram below)
> 2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
> experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
> client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
> with Hive etc. The table will guide users to the corresponding page for each 
> option.  !LandscapeOfFlinkSQL.drawio(6).png!



--
T

[PR] Adding Opensearch Connector v1.2.0 [flink-web]

2024-05-15 Thread via GitHub


snuyanzin opened a new pull request, #740:
URL: https://github.com/apache/flink-web/pull/740

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35369) Improve `Table API and SQL` page or add new page to guide new users to right Flink SQL option

2024-05-15 Thread Keith Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Lee updated FLINK-35369:
--
Description: 
Flink has rich and varied SQL offerings/deployment mode, it can take some time 
for new users to investigate and arrive at the right offering for them. 
Consider the available options:

1. Flink SQL Client (through SQL gateway, embedded or remote)
2. REST through SQL Gateway
3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
interface)
5. Flink Client submitting packaged application (Java/Scala/Python)
6. Submitting packaged archive with code that uses Table API to JobManager REST 
endpoint 

(Additionally, Apache Zeppelin also provide notebook experience with its Flink 
SQL interpreter which builds upon Flink Client.)

The improvement being suggested here is to either enrich existing [Table API 
and SQL overview 
page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
 or create new page that contains the following information:

1. Diagram on the various options available (see diagram below)
2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
with Hive etc. The table will guide users to the corresponding page for each 
option.  !LandscapeOfFlinkSQL.drawio(6).png!

  was:
Flink has rich and varied SQL offerings/deployment mode, it can take some time 
for new users to investigate and arrive at the right offering for them. 
Consider the available options:

1. Flink SQL Client (through SQL gateway, embedded or remote)
2. REST through SQL Gateway
3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
interface)
5. Flink Client submitting packaged application (Java/Scala/Python)
6. Submitting packaged archive with code that uses Table API

(Additionally, Apache Zeppelin also provide notebook experience with its Flink 
SQL interpreter which builds upon Flink Client.)

The improvement being suggested here is to either enrich existing [Table API 
and SQL overview 
page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
 or create new page that contains the following information:

1. Diagram on the various options available (see diagram below)
2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
with Hive etc. The table will guide users to the corresponding page for each 
option. !LandscapeOfFlinkSQL.drawio(6).png!


> Improve `Table API and SQL` page or add new page to guide new users to right 
> Flink SQL option
> -
>
> Key: FLINK-35369
> URL: https://issues.apache.org/jira/browse/FLINK-35369
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Affects Versions: 1.19.0
>Reporter: Keith Lee
>Priority: Major
> Attachments: LandscapeOfFlinkSQL.drawio(6).png
>
>
> Flink has rich and varied SQL offerings/deployment mode, it can take some 
> time for new users to investigate and arrive at the right offering for them. 
> Consider the available options:
> 1. Flink SQL Client (through SQL gateway, embedded or remote)
> 2. REST through SQL Gateway
> 3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
> 4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
> interface)
> 5. Flink Client submitting packaged application (Java/Scala/Python)
> 6. Submitting packaged archive with code that uses Table API to JobManager 
> REST endpoint 
> (Additionally, Apache Zeppelin also provide notebook experience with its 
> Flink SQL interpreter which builds upon Flink Client.)
> The improvement being suggested here is to either enrich existing [Table API 
> and SQL overview 
> page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
>  or create new page that contains the following information:
> 1. Diagram on the various options available (see diagram below)
> 2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
> experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
> client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
> with Hive etc. The table will guide users to the corresponding page for each 
> option.  !LandscapeOfFlinkSQL.drawio(6).png!



--
This message was sent 

[jira] [Created] (FLINK-35369) Improve `Table API and SQL` page or add new page to guide new users to right Flink SQL option

2024-05-15 Thread Keith Lee (Jira)
Keith Lee created FLINK-35369:
-

 Summary: Improve `Table API and SQL` page or add new page to guide 
new users to right Flink SQL option
 Key: FLINK-35369
 URL: https://issues.apache.org/jira/browse/FLINK-35369
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Affects Versions: 1.19.0
Reporter: Keith Lee
 Attachments: LandscapeOfFlinkSQL.drawio(6).png

Flink has rich and varied SQL offerings/deployment mode, it can take some time 
for new users to investigate and arrive at the right offering for them. 
Consider the available options:

1. Flink SQL Client (through SQL gateway, embedded or remote)
2. REST through SQL Gateway
3. A SQL client with Flink JDBC driver (through SQL gateway's REST interface)
4. A SQL client with Hive JDBC driver (through SQL gateway's HiveServer2 
interface)
5. Flink Client submitting packaged application (Java/Scala/Python)
6. Submitting packaged archive with code that uses Table API

(Additionally, Apache Zeppelin also provide notebook experience with its Flink 
SQL interpreter which builds upon Flink Client.)

The improvement being suggested here is to either enrich existing [Table API 
and SQL overview 
page|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/]
 or create new page that contains the following information:

1. Diagram on the various options available (see diagram below)
2. Table explaining pros of each approach e.g. Flink SQL Client for initial 
experimentation, development, OLAP. Implementing on top of Flink SQL JDBC 
client or SQL Gateway REST for automation, HiveServer2 for inter-operabilty 
with Hive etc. The table will guide users to the corresponding page for each 
option. !LandscapeOfFlinkSQL.drawio(6).png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35094) SinkTestSuiteBase.testScaleDown is hanging for 1.20-SNAPSHOT

2024-05-15 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-35094:

Fix Version/s: opensearch-2.0.0

> SinkTestSuiteBase.testScaleDown is hanging for 1.20-SNAPSHOT
> 
>
> Key: FLINK-35094
> URL: https://issues.apache.org/jira/browse/FLINK-35094
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: elasticsearch-3.1.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: opensearch-1.2.0, elasticsearch-3.2.0, opensearch-2.0.0
>
>
> Currently it is reproduced for elastic search connector
> all the ci jobs (for all jdks) against 1.20-SNAPSHOT are hanging on 
> {noformat}
> 2024-04-12T05:56:50.6179284Z "main" #1 prio=5 os_prio=0 cpu=18726.96ms 
> elapsed=2522.03s tid=0x7f670c025a50 nid=0x3c6d waiting on condition  
> [0x7f6712513000]
> 2024-04-12T05:56:50.6180667Zjava.lang.Thread.State: TIMED_WAITING 
> (sleeping)
> 2024-04-12T05:56:50.6181497Z  at 
> java.lang.Thread.sleep(java.base@17.0.10/Native Method)
> 2024-04-12T05:56:50.6182762Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:152)
> 2024-04-12T05:56:50.6184456Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145)
> 2024-04-12T05:56:50.6186346Z  at 
> org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.checkResultWithSemantic(SinkTestSuiteBase.java:504)
> 2024-04-12T05:56:50.6188474Z  at 
> org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.restartFromSavepoint(SinkTestSuiteBase.java:327)
> 2024-04-12T05:56:50.6190145Z  at 
> org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.testScaleDown(SinkTestSuiteBase.java:224)
> 2024-04-12T05:56:50.6191247Z  at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.10/Native
>  Method)
> 2024-04-12T05:56:50.6192806Z  at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.10/NativeMethodAccessorImpl.java:77)
> 2024-04-12T05:56:50.6193863Z  at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.10/DelegatingMethodAccessorImpl.java:43)
> 2024-04-12T05:56:50.6194834Z  at 
> java.lang.reflect.Method.invoke(java.base@17.0.10/Method.java:568)
> {noformat}
> for 1.17, 1.18, 1.19 there is no such issue and everything is ok
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/8538572134



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33109) Watermark alignment not applied after recovery from checkpoint

2024-05-15 Thread Yordan Pavlov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846747#comment-17846747
 ] 

Yordan Pavlov commented on FLINK-33109:
---

Hi [~pnowojski] , yes we do not observe the problem in 1.18

> Watermark alignment not applied after recovery from checkpoint
> --
>
> Key: FLINK-33109
> URL: https://issues.apache.org/jira/browse/FLINK-33109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Yordan Pavlov
>Priority: Major
> Attachments: WatermarkTest-1.scala, 
> image-2023-09-18-15-40-06-868.png, image-2023-09-18-15-46-16-106.png
>
>
> I am observing a problem where after recovery from a checkpoint the Kafka 
> source watermarks would start to diverge not honoring the watermark alignment 
> setting I have applied.
> I have a Kafka source which reads a topic with 32 partitions. I am applying 
> the following watermark strategy:
> {code:java}
> new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => 
> msg.value.getTimestamp)
>       .withWatermarkAlignment("alignment-sources-group", 
> time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
>  
> This works great up until my job needs to recover from checkpoint. Once the 
> recovery takes place, no alignment is taking place any more. This can best be 
> illustrated by looking at the watermark metrics for various operators in the 
> image:
> !image-2023-09-18-15-40-06-868.png!
>  
> You can see how the watermarks disperse after the recovery. Trying to debug 
> the problem I noticed that before the failure there would be calls in
>  
> {code:java}
> SourceCoordinator::announceCombinedWatermark() 
> {code}
> after the recovery, no calls get there, so no value for 
> {code:java}
> watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
> is ever read. I can manually fix the problem If I stop the job, clear all 
> state from Zookeeper and then manually start Flink providing the last 
> checkpoint with 
> {code:java}
> '–fromSavepoint'{code}
>  flag. This would cause the SourceCoordinator to be constructed properly and 
> watermark drift to be checked. Once recovery manually watermarks would again 
> converge to the allowed drift as seen in the metrics:
> !image-2023-09-18-15-46-16-106.png!
>  
> Let me know If I can be helpful by providing any more information.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23307) Rowtime attributes are not properly resolved for views

2024-05-15 Thread Glauber M Dantas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846732#comment-17846732
 ] 

Glauber M Dantas commented on FLINK-23307:
--

I'm facing the same issue while using the JDBC connector to enrich the stream 
data with data retrieved from an external database. I created a StackOverflow 
discussion with more details: 
[https://stackoverflow.com/questions/78485099/sedona-flink-sql-lookup-on-external-database-failing-when-using-for-system-time]

Using Flink 1.18.1.

> Rowtime attributes are not properly resolved for views
> --
>
> Key: FLINK-23307
> URL: https://issues.apache.org/jira/browse/FLINK-23307
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.1
>Reporter: Ingo Bürk
>Assignee: Ingo Bürk
>Priority: Major
>
> If a view is contains a rowtime attribute from an underlying table and is 
> then used in a temporal join, an error is thrown
> {quote}Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' 
> left table's time attribute field
> {quote}
> This does not happen with the generic in-memory catalog, because it copies 
> the schema as-is. However, if a catalog implementation is used which persists 
> the schema without the rowtime information (which is correct, since this is 
> defined by the underlying table and not the view itself), the catalog can 
> only return said schema for a view. This then causes this issue during 
> planning.
> Specifically, this happens in SqlCatalogViewTable#convertToRel. After the 
> call to context#expandView, the rowtime attribute is correctly present, but 
> the inserted cast from RelOptUtil#createCastRel throws this information away.
> The following SQL reproduces the issue. Again, please note that this does NOT 
> work with the default in-memory catalog:
> {code:java}
> CREATE TABLE A (
> id INT,
> ts TIMESTAMP(3),
> WATERMARK FOR ts AS ts,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'datagen'
> );
> CREATE VIEW B AS SELECT * FROM A;
> SELECT * FROM B JOIN A FOR SYSTEM_TIME AS OF B.ts ON B.id = A.id;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35363) FLIP-449: Reorganization of flink-connector-jdbc

2024-05-15 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-35363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846731#comment-17846731
 ] 

João Boto commented on FLINK-35363:
---

can anyone assign this to me please

> FLIP-449: Reorganization of flink-connector-jdbc
> 
>
> Key: FLINK-35363
> URL: https://issues.apache.org/jira/browse/FLINK-35363
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: João Boto
>Priority: Major
>
> Described in: 
> [FLIP-449|https://cwiki.apache.org/confluence/display/FLINK/FLIP-449%3A+Reorganization+of+flink-connector-jdbc]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35363) FLIP-449: Reorganization of flink-connector-jdbc

2024-05-15 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-35363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

João Boto updated FLINK-35363:
--
Summary: FLIP-449: Reorganization of flink-connector-jdbc  (was: 
Reorganization of flink-connector-jdbc)

> FLIP-449: Reorganization of flink-connector-jdbc
> 
>
> Key: FLINK-35363
> URL: https://issues.apache.org/jira/browse/FLINK-35363
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: João Boto
>Priority: Major
>
> Described in: 
> [FLIP-449|https://cwiki.apache.org/confluence/display/FLINK/FLIP-449%3A+Reorganization+of+flink-connector-jdbc]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35362) Reorganization of flink-connector-jdbc

2024-05-15 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-35362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

João Boto closed FLINK-35362.
-
Resolution: Duplicate

> Reorganization of flink-connector-jdbc
> --
>
> Key: FLINK-35362
> URL: https://issues.apache.org/jira/browse/FLINK-35362
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: João Boto
>Priority: Major
>
> Described in: 
> [FLIP-449|https://cwiki.apache.org/confluence/display/FLINK/FLIP-449%3A+Reorganization+of+flink-connector-jdbc]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35362) Reorganization of flink-connector-jdbc

2024-05-15 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-35362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846727#comment-17846727
 ] 

João Boto commented on FLINK-35362:
---

closing as duplicated..

> Reorganization of flink-connector-jdbc
> --
>
> Key: FLINK-35362
> URL: https://issues.apache.org/jira/browse/FLINK-35362
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: João Boto
>Priority: Major
>
> Described in: 
> [FLIP-449|https://cwiki.apache.org/confluence/display/FLINK/FLIP-449%3A+Reorganization+of+flink-connector-jdbc]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35368) Reorganize table code

2024-05-15 Thread Jira
João Boto created FLINK-35368:
-

 Summary: Reorganize table code
 Key: FLINK-35368
 URL: https://issues.apache.org/jira/browse/FLINK-35368
 Project: Flink
  Issue Type: Sub-task
Reporter: João Boto






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35367) Reorganize datastream sink and source

2024-05-15 Thread Jira
João Boto created FLINK-35367:
-

 Summary: Reorganize datastream sink and source
 Key: FLINK-35367
 URL: https://issues.apache.org/jira/browse/FLINK-35367
 Project: Flink
  Issue Type: Sub-task
Reporter: João Boto


Reorganize datastream sink and source



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35366) Create all database modules

2024-05-15 Thread Jira
João Boto created FLINK-35366:
-

 Summary: Create all database modules
 Key: FLINK-35366
 URL: https://issues.apache.org/jira/browse/FLINK-35366
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / JDBC
Reporter: João Boto


Create all database modules and move related code there



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35365) Reorganize catalog and dialect code

2024-05-15 Thread Jira
João Boto created FLINK-35365:
-

 Summary: Reorganize catalog and dialect code
 Key: FLINK-35365
 URL: https://issues.apache.org/jira/browse/FLINK-35365
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / JDBC
Reporter: João Boto


Reorganize code for catalog and dialect



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35364) Create core module and move code

2024-05-15 Thread Jira
João Boto created FLINK-35364:
-

 Summary: Create core module and move code
 Key: FLINK-35364
 URL: https://issues.apache.org/jira/browse/FLINK-35364
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / JDBC
Reporter: João Boto


* create core module
* move all code to this new module as is
* transforme flink-connector-jdbc in shaded module



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32828] Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint [flink]

2024-05-15 Thread via GitHub


flinkbot commented on PR #24794:
URL: https://github.com/apache/flink/pull/24794#issuecomment-2113084800

   
   ## CI report:
   
   * 18492c53d984d530f1182c161942ae9a61b84733 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32828) Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32828:
---
Labels: pull-request-available  (was: )

> Partition aware watermark not handled correctly shortly after job start up 
> from checkpoint or savepoint
> ---
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark shoul

[PR] [FLINK-32828] Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint [flink]

2024-05-15 Thread via GitHub


pnowojski opened a new pull request, #24794:
URL: https://github.com/apache/flink/pull/24794

   ## What is the purpose of the change
   
   Properly initialize initial splits in WatermarkOutputMultiplexer
   
   Without this fix, initial splits were registered in the multiplexer only
   when first record from that split has been emitted. This was leading to
   incorrectly emitted watermarks, as resulting watermark was not properly
   combined from the initial splits, but only from the splits that have
   already emitted at least one record.
   
   
   ## Brief change log
   
   Please check individual commit messages 
   
   ## Verifying this change
   
   This bugfix is covered against future regressions by a new test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30284][flink-metrics] Modify configuration of DataDog http reporter url. [flink]

2024-05-15 Thread via GitHub


anleib commented on code in PR #23610:
URL: https://github.com/apache/flink/pull/23610#discussion_r1601965479


##
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java:
##
@@ -41,9 +41,9 @@ public class DatadogHttpClient {
 private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpClient.class);
 
 private static final String SERIES_URL_FORMAT =
-"https://app.datadoghq.%s/api/v1/series?api_key=%s";;
+"%sapi/v1/series?api_key=%s";

Review Comment:
   missing `/`, should be: `%s/api/v1/series?api_key=%s`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-20625) Refactor Google Cloud PubSub Source in accordance with FLIP-27

2024-05-15 Thread Claire McCarthy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846709#comment-17846709
 ] 

Claire McCarthy commented on FLINK-20625:
-

yeah i am. got pre-empted by some other work but it's still very much on the 
radar

> Refactor Google Cloud PubSub Source in accordance with FLIP-27
> --
>
> Key: FLINK-20625
> URL: https://issues.apache.org/jira/browse/FLINK-20625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Google Cloud PubSub
>Reporter: Jakob Edding
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> The Source implementation for Google Cloud Pub/Sub should be refactored in 
> accordance with [FLIP-27: Refactor Source 
> Interface|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95653748].
> *Split Enumerator*
> Pub/Sub doesn't expose any partitions to consuming applications. Therefore, 
> the implementation of the Pub/Sub Split Enumerator won't do any real work 
> discovery. Instead, a static Source Split is handed to Source Readers which 
> request a Source Split. This static Source Split merely contains details 
> about the connection to Pub/Sub and the concrete Pub/Sub subscription to use 
> but no Split-specific information like partitions/offsets because this 
> information can not be obtained.
> *Source Reader*
> A Source Reader will use Pub/Sub's pull mechanism to read new messages from 
> the Pub/Sub subscription specified in the SourceSplit. In the case of 
> parallel-running Source Readers in Flink, every Source Reader will be passed 
> the same Source Split from the Enumerator. Because of this, all Source 
> Readers use the same connection details and the same Pub/Sub subscription to 
> receive messages. In this case, Pub/Sub will automatically load-balance 
> messages between all Source Readers pulling from the same subscription. This 
> way, parallel processing can be achieved in the Source.
> *At-least-once guarantee*
> Pub/Sub itself guarantees at-least-once message delivery so it is the goal to 
> keep up this guarantee in the Source as well. A mechanism that can be used to 
> achieve this is that Pub/Sub expects a message to be acknowledged by the 
> subscriber to signal that the message has been consumed successfully. Any 
> message that has not been acknowledged yet will be automatically redelivered 
> by Pub/Sub once an ack deadline has passed.
> After a certain time interval has elapsed...
>  # all pulled messages are checkpointed in the Source Reader
>  # same messages are acknowledged to Pub/Sub
>  # same messages are forwarded to downstream Flink tasks
> This should ensure at-least-once delivery in the Source because in the case 
> of failure, non-checkpointed messages have not yet been acknowledged and will 
> therefore be redelivered by the Pub/Sub service.
> Because of the static Source Split, it appears like checkpointing is not 
> necessary in the Split Enumerator.
> *Possible exactly-once guarantee*
> It should even be possible to achieve exactly-once guarantees for the source. 
> The following requirements would have to be met to have an exactly-once mode 
> besides the at-least-once mode similar to how it is done in the [current 
> RabbitMQ 
> Source|https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source]:
>  * The system which publishes messages to Pub/Sub must add an id to each 
> message so that messages can be deduplicated in the Source.
>  * The Source must run in a non-parallel fashion (with parallelism=1).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32828) Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Summary: Partition aware watermark not handled correctly shortly after job 
start up from checkpoint or savepoint  (was: Kafka partition aware watermark 
not handled correctly shortly after job start up from checkpoint or savepoint)

> Partition aware watermark not handled correctly shortly after job start up 
> from checkpoint or savepoint
> ---
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Receiv

[jira] [Comment Edited] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846694#comment-17846694
 ] 

Piotr Nowojski edited comment on FLINK-32828 at 5/15/24 4:27 PM:
-

We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

edit: Also the problem doesn't affect only Kafka, but all FLIP-27 sources 
(anything that uses {{SourceOperator}}.


was (Author: pnowojski):
We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on sin

[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846700#comment-17846700
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/15/24 4:21 PM:


When running in Batch Mode If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception

{ out.collect(111); }

This is a new record being emitted and has no existing timestamp.

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE


was (Author: JIRAUSER302517):
When running in Batch Mode If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception {
out.collect(111);
}

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846700#comment-17846700
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/15/24 4:16 PM:


When running in Batch Mode If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception {
out.collect(111);
}

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE


was (Author: JIRAUSER302517):
If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); 

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception \{
out.collect(111);
} 

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846700#comment-17846700
 ] 

Kanthi Vaidya commented on FLINK-35289:
---

If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); 

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception \{
out.collect(111);
} 

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35363) Reorganization of flink-connector-jdbc

2024-05-15 Thread Jira
João Boto created FLINK-35363:
-

 Summary: Reorganization of flink-connector-jdbc
 Key: FLINK-35363
 URL: https://issues.apache.org/jira/browse/FLINK-35363
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: João Boto


Described in: 
[FLIP-449|https://cwiki.apache.org/confluence/display/FLINK/FLIP-449%3A+Reorganization+of+flink-connector-jdbc]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35362) Reorganization of flink-connector-jdbc

2024-05-15 Thread Jira
João Boto created FLINK-35362:
-

 Summary: Reorganization of flink-connector-jdbc
 Key: FLINK-35362
 URL: https://issues.apache.org/jira/browse/FLINK-35362
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: João Boto


Described in: 
[FLIP-449|https://cwiki.apache.org/confluence/display/FLINK/FLIP-449%3A+Reorganization+of+flink-connector-jdbc]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Fsip module 2b [flink-training]

2024-05-15 Thread via GitHub


manoellins closed pull request #82: Fsip module 2b
URL: https://github.com/apache/flink-training/pull/82


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601908776


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@code 
PubSubSinkV2Builder}.builder()
+ * .setProjectId("project-id")
+ * .setTopicId("topic-id)
+ * .setGcpPublisherConfig(gcpPublisherConfig)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setMaximumInflightMessages(10)
+ * .setFailOnError(true)
+ * .build();
+ *
+ * }
+ *
+ * @param 
+ */
+@PublicEvolving
+public class PubSubSinkV2Builder {
+private String projectId;
+private String topicId;
+private SerializationSchema serializationSchema;
+private GcpPublisherConfig gcpPublisherConfig;
+private Integer numMaxInflightRequests;
+private Boolean failOnError;
+
+public PubSubSinkV2Builder setProjectId(String projectId) {
+this.projectId = projectId;
+return this;
+}
+
+public PubSubSinkV2Builder setTopicId(String topicId) {
+this.topicId = topicId;
+return this;
+}
+
+public PubSubSinkV2Builder setSerializationSchema(
+SerializationSchema serializationSchema) {
+this.serializationSchema = serializationSchema;
+return this;
+}
+
+public PubSubSinkV2Builder setGcpPublisherConfig(GcpPublisherConfig 
gcpPublisherConfig) {
+this.gcpPublisherConfig = gcpPublisherConfig;
+return this;
+}
+
+public PubSubSinkV2Builder setNumMaxInflightRequests(int 
numMaxInflightRequests) {
+this.numMaxInflightRequests = numMaxInflightRequests;

Review Comment:
   what happens if we pass a negative number?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601907967


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ *  number of inflight requests exceeds the specified limit.
+ *
+ * @param  The type of the records .
+ */
+@Internal
+public class PubSubWriter implements SinkWriter {
+
+/** The PubSub generic client to publish messages. */
+private final GcpWriterClient publisher;
+
+/**
+ * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+ * inflight requests exceeds the specified limit.
+ */
+private final long maximumInflightRequests;

Review Comment:
   Can we align types?
   here it is `long` while in `PubSubSinkV2Builder` it is `int`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846694#comment-17846694
 ] 

Piotr Nowojski commented on FLINK-32828:


We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, ti

Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601895646


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.

Review Comment:
   ```suggestion
*   {@code maximumInflightMessages} for the maximum number of inflight 
messages.
   ```
   i would suggest we use the same word everywhere rather then it's different 
variations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Priority: Critical  (was: Major)

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit and then should not 
> progress when rec

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Affects Version/s: 1.18.1
   1.19.0

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit a

[jira] [Assigned] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-32828:
--

Assignee: Piotr Nowojski

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Major
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit and then should not 
> progress when receivi

Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601877099


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java:
##
@@ -0,0 +1,245 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import 
org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusRuntimeException;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/** Tests for {@link PubSubWriter}. */
+public class PubSubWriterTest {
+
+private static final int MAXIMUM_INFLIGHT_MESSAGES = 3;
+
+@Test
+void writeMessageDeliversMessageUsingClient() throws IOException, 
InterruptedException {
+TestGcpWriterClient client = new TestGcpWriterClient();
+PubSubWriter writer =
+getDefaultWriter(client, new TestSinkInitContext(), 
MAXIMUM_INFLIGHT_MESSAGES);
+String message = "test-message";
+writer.write(message, null);
+client.deliverMessage(message);
+writer.flush(false);
+
+
Assertions.assertThat(client.getDeliveredMessages()).containsExactly(message);
+}
+
+@Test
+void writeMessageIncrementsMetricsOnDelivery() throws IOException, 
InterruptedException {
+TestGcpWriterClient client = new TestGcpWriterClient();
+WriterInitContext context = new TestSinkInitContext();
+
+PubSubWriter writer = getDefaultWriter(client, context, 
MAXIMUM_INFLIGHT_MESSAGES);
+String message = "test-message";
+
+// write message
+writer.write(message, null);
+
+// get metrics before delivery
+Counter numBytesOutCounter = 
context.metricGroup().getNumBytesSendCounter();
+Counter numRecordsSendCounter = 
context.metricGroup().getNumRecordsSendCounter();
+long recordsSentBeforeDelivery = numRecordsSendCounter.getCount();
+long bytesSentBeforeDelivery = numBytesOutCounter.getCount();
+
+// deliver message
+client.deliverMessage(message);
+writer.flush(false);
+
+long messageSize =
+PubsubMessage.newBuilder()
+.setData(ByteString.copyFromUtf8(message))
+.build()
+.getSerializedSize();
+
+Assertions.assertThat(recordsSentBeforeDelivery).isEqualTo(0);
+Assertions.assertThat(bytesSentBeforeDelivery).isEqualTo(0);
+Assertions.assertThat(numRecordsSendCounter.getCount()).isEqualTo(1);
+
Assertions.assertThat(numBytesOutCounter.getCount()).isEqualTo(messageSize);

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601876558


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java:
##
@@ -0,0 +1,245 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import 
org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusRuntimeException;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/** Tests for {@link PubSubWriter}. */
+public class PubSubWriterTest {
+
+private static final int MAXIMUM_INFLIGHT_MESSAGES = 3;
+
+@Test
+void writeMessageDeliversMessageUsingClient() throws IOException, 
InterruptedException {
+TestGcpWriterClient client = new TestGcpWriterClient();
+PubSubWriter writer =
+getDefaultWriter(client, new TestSinkInitContext(), 
MAXIMUM_INFLIGHT_MESSAGES);
+String message = "test-message";
+writer.write(message, null);
+client.deliverMessage(message);
+writer.flush(false);
+
+
Assertions.assertThat(client.getDeliveredMessages()).containsExactly(message);

Review Comment:
   `static import`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601876167


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java:
##
@@ -0,0 +1,245 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import 
org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusRuntimeException;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/** Tests for {@link PubSubWriter}. */
+public class PubSubWriterTest {

Review Comment:
   ```suggestion
   class PubSubWriterTest {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601875238


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2BuilderTest.java:
##
@@ -0,0 +1,133 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/** Tests for {@link PubSubSinkV2Builder}. */
+public class PubSubSinkV2BuilderTest {

Review Comment:
   ```suggestion
   class PubSubSinkV2BuilderTest {
   ```
   junit5 doesn't need `public`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601874397


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2BuilderTest.java:
##
@@ -0,0 +1,133 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/** Tests for {@link PubSubSinkV2Builder}. */
+public class PubSubSinkV2BuilderTest {
+
+@Test
+void builderBuildsSinkWithCorrectProperties() {
+PubSubSinkV2Builder builder = PubSubSinkV2.builder();
+GcpPublisherConfig gcpPublisherConfig =
+GcpPublisherConfig.builder()
+.setCredentialsProvider(NoCredentialsProvider.create())
+.build();
+
+SerializationSchema serializationSchema = new 
SimpleStringSchema();
+
+builder.setProjectId("test-project-id")
+.setTopicId("test-topic-id")
+.setGcpPublisherConfig(gcpPublisherConfig)
+.setSerializationSchema(serializationSchema)
+.setNumMaxInflightRequests(10)
+.setFailOnError(true);
+PubSubSinkV2 sink = builder.build();
+
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("projectId", 
"test-project-id");
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("topicId", 
"test-topic-id");
+Assertions.assertThat(sink)
+.hasFieldOrPropertyWithValue("serializationSchema", 
serializationSchema);
+Assertions.assertThat(sink)
+.hasFieldOrPropertyWithValue("publisherConfig", 
gcpPublisherConfig);
+
Assertions.assertThat(sink).hasFieldOrPropertyWithValue("maxInFlightRequests", 
10);
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("failOnError", 
true);

Review Comment:
   normally in Flink code we have `static import` for `Assertions`, I think we 
should follow same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >