[jira] [Commented] (FLINK-35411) Optimize wait logic in draining of async state requests

2024-05-22 Thread liuzhuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848833#comment-17848833
 ] 

liuzhuo commented on FLINK-35411:
-

[~zakelly] I am very interested in changes related to FLIP-423. If you have a 
suitable issue, please call me.

> Optimize wait logic in draining of async state requests
> ---
>
> Key: FLINK-35411
> URL: https://issues.apache.org/jira/browse/FLINK-35411
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends, Runtime / Task
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>
> Currently during draining of async state requests, the task thread performs 
> {{Thread.sleep}} to avoid cpu overhead when polling mails. This can be 
> optimized by wait & notify.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35400][checkpoint] Release FileMergingSnapshotManager if all tasks finished [flink]

2024-05-22 Thread via GitHub


fredia commented on code in PR #24817:
URL: https://github.com/apache/flink/pull/24817#discussion_r1611094596


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##
@@ -53,7 +57,8 @@ public class TaskExecutorFileMergingManager {
  * manager(executor).
  */
 @GuardedBy("lock")
-private final Map 
fileMergingSnapshotManagerByJobId;

Review Comment:
   I'm not sure if using  directly as the key is 
enough? to avoid nesting. 



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##
@@ -74,8 +79,9 @@ public TaskExecutorFileMergingManager() {
  * Initialize file merging snapshot manager for each job according 
configurations when {@link
  * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
  */
-public @Nullable FileMergingSnapshotManager 
fileMergingSnapshotManagerForJob(
+public @Nullable FileMergingSnapshotManager 
fileMergingSnapshotManagerForTask(

Review Comment:
   How about renaming it to `fileMergingSnapshotManagerForAttempt`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35359) General Improvement to Configuration for Flink 2.0

2024-05-22 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-35359:


Assignee: Xuannan Su

> General Improvement to Configuration for Flink 2.0
> --
>
> Key: FLINK-35359
> URL: https://issues.apache.org/jira/browse/FLINK-35359
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> As Flink moves toward version 2.0, we want to provide users with a better 
> experience with the existing configuration. In this FLIP, we outline several 
> general improvements to the current configuration:
>  * Ensure all the ConfigOptions are properly annotated
>  * Ensure all user-facing configurations are included in the documentation 
> generation process
>  * Make the existing ConfigOptions use the proper type
>  * Mark all internally used ConfigOptions with the @Internal annotation
>  
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-05-22 Thread via GitHub


afedulov commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1611060409


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##
@@ -98,16 +108,17 @@ static  CsvBulkWriter forPojo(Class 
pojoClass, FSDataOutputStr
 @Override
 public void addElement(T element) throws IOException {
 final R r = converter.convert(element, converterContext);
-csvWriter.writeValue(stream, r);
+csvWriter.writeValue(generator, r);
 }
 
 @Override
 public void flush() throws IOException {
-stream.flush();
+generator.flush();
 }
 
 @Override
 public void finish() throws IOException {
+generator.close();

Review Comment:
   @GOODBOY008 properly testing this behavior with a unit test might be tricky. 
I wrote a quick sketch of an integration test that you could consider making 
use of:
   
https://github.com/afedulov/flink/blob/fix-csv-flush-test/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
   I did not spend much time coming up with proper assertions, you can surely 
come up with something more elegant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-05-22 Thread via GitHub


afedulov commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1611060409


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##
@@ -98,16 +108,17 @@ static  CsvBulkWriter forPojo(Class 
pojoClass, FSDataOutputStr
 @Override
 public void addElement(T element) throws IOException {
 final R r = converter.convert(element, converterContext);
-csvWriter.writeValue(stream, r);
+csvWriter.writeValue(generator, r);
 }
 
 @Override
 public void flush() throws IOException {
-stream.flush();
+generator.flush();
 }
 
 @Override
 public void finish() throws IOException {
+generator.close();

Review Comment:
   @GOODBOY008 properly testing this behavior with a unit test might be tricky. 
I wrote a quick sketch of an integration test that you could consider making 
use of:
   
https://github.com/afedulov/flink/blob/fix-csv-flush-test/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterTest.java
   I did not spend much time coming up with proper assertions, you can surely 
come up with something more elegant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33892][runtime] Support Job Recovery from JobMaster Failures for Batch Jobs. [flink]

2024-05-22 Thread via GitHub


zhuzhurk commented on code in PR #24771:
URL: https://github.com/apache/flink/pull/24771#discussion_r1609784148


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler 
createSpeculativeExecutionHandler(
 protected void startSchedulingInternal() {
 speculativeExecutionHandler.init(
 getExecutionGraph(), getMainThreadExecutor(), 
jobManagerJobMetricGroup);
+jobRecoveryHandler.initialize(
+log,
+getExecutionGraph(),
+shuffleMaster,
+getMainThreadExecutor(),
+failoverStrategy,
+this::failJob,
+this::resetVerticesInRecovering,
+this::updateResultPartitionBytesMetrics,
+this::initializeJobVertex,
+this::updateTopology);
+
+if (jobRecoveryHandler.needRecover()) {
+getMainThreadExecutor()
+.schedule(
+() ->
+jobRecoveryHandler.startRecovering(
+this::onRecoveringFinished, 
this::onRecoveringFailed),
+previousWorkerRecoveryTimeout.toMillis(),
+TimeUnit.MILLISECONDS);
+} else {
+tryComputeSourceParallelismThenRunAsync(
+(Void value, Throwable throwable) -> {
+if (getExecutionGraph().getState() == 
JobStatus.CREATED) {
+initializeVerticesIfPossible();
+super.startSchedulingInternal();
+}
+});
+}
+}
+
+@Override
+protected void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+FailureHandlingResult wrappedResult = failureHandlingResult;
+if (failureHandlingResult.canRestart()) {
+Set originalNeedToRestartVertices =
+failureHandlingResult.getVerticesToRestart();
+
+Set extraNeedToRestartJobVertices =
+originalNeedToRestartVertices.stream()
+.map(ExecutionVertexID::getJobVertexId)
+.filter(requiredRestartJobVertices::contains)
+.collect(Collectors.toSet());
+
+
requiredRestartJobVertices.removeAll(extraNeedToRestartJobVertices);
+
+Set needToRestartVertices =
+extraNeedToRestartJobVertices.stream()
+.flatMap(
+jobVertexId -> {
+ExecutionJobVertex jobVertex =
+
getExecutionJobVertex(jobVertexId);
+return 
Arrays.stream(jobVertex.getTaskVertices())
+.map(ExecutionVertex::getID);
+})
+.collect(Collectors.toSet());
+needToRestartVertices.addAll(originalNeedToRestartVertices);
+
+wrappedResult =
+FailureHandlingResult.restartable(
+
failureHandlingResult.getFailedExecution().orElse(null),
+failureHandlingResult.getError(),
+failureHandlingResult.getTimestamp(),
+failureHandlingResult.getFailureLabels(),
+needToRestartVertices,
+failureHandlingResult.getRestartDelayMS(),
+failureHandlingResult.isGlobalFailure(),
+failureHandlingResult.isRootCause());
+}
+
+super.maybeRestartTasks(wrappedResult);
+}
+
+@VisibleForTesting
+boolean isRecovering() {
+return jobRecoveryHandler.isRecovering();
+}
+
+@Override
+public boolean updateTaskExecutionState(final TaskExecutionStateTransition 
taskExecutionState) {
+boolean success = super.updateTaskExecutionState(taskExecutionState);
+
+if (success
+&& taskExecutionState.getExecutionState() == 
ExecutionState.FINISHED
+&& !isRecovering()) {
+final ExecutionVertexID executionVertexId =
+taskExecutionState.getID().getExecutionVertexId();
+jobRecoveryHandler.notifyExecutionFinished(executionVertexId, 
taskExecutionState);
+}
+return success;
+}
+
+@Override
+protected void resetForNewExecutions(Collection 
vertices) {
+super.resetForNewExecutions(vertices);
+if (!isRecovering()) {
+jobRecoveryHandler.notifyExecutionVertexReset(vertices);
+}
+}
+
+private void initializeJobVertex(
+  

Re: [PR] [FLINK-33892][runtime] Support Job Recovery from JobMaster Failures for Batch Jobs. [flink]

2024-05-22 Thread via GitHub


JunRuiLee commented on code in PR #24771:
URL: https://github.com/apache/flink/pull/24771#discussion_r1611044994


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultBatchJobRecoveryHandler.java:
##
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
+import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexInitializedEvent;
+import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
+import org.apache.flink.runtime.jobmaster.event.ExecutionVertexResetEvent;
+import org.apache.flink.runtime.jobmaster.event.JobEvent;
+import org.apache.flink.runtime.jobmaster.event.JobEventManager;
+import org.apache.flink.runtime.jobmaster.event.JobEventReplayHandler;
+import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.DefaultShuffleMasterSnapshotContext;
+import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshot;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.QuadConsumerWithException;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator.NO_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Default implementation of {@link BatchJobRecoveryHandler} and {@link 
JobEventReplayHandler}. */
+public class DefaultBatchJobRecoveryHandler
+implements BatchJobRecoveryHandler, JobEventReplayHandler {
+
+private Logger log;
+
+private final JobEventMana

Re: [PR] [FLINK-35359][config] General Improvement to Configuration for Flink 2.0 [flink]

2024-05-22 Thread via GitHub


Sxnan commented on PR #24815:
URL: https://github.com/apache/flink/pull/24815#issuecomment-2126293155

   @xintongsong  Thanks for the review! I updated the PR accordingly. Please 
take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34123][core][type] Introduce built-in serialization support for map and lists [flink]

2024-05-22 Thread via GitHub


X-czh commented on PR #24634:
URL: https://github.com/apache/flink/pull/24634#issuecomment-2126292943

   > Thanks @X-czh, I think we only need two commits. One for the 
implementation and one for documentation.
   
   Thanks for the notice, @reswqa. I've rearranged the commits to have only two 
commits: one for impl, one for doc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-05-22 Thread via GitHub


jectpro7 commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1610081555


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * The Bunch Put access request for ForStDB.
+ *
+ * @param  The type of key in original state access request.
+ */
+public class ForStDBBunchPutRequest extends 
ForStDBPutRequest, Map> {
+
+/** Serializer for the user values. */
+final TypeSerializer userValueSerializer;
+
+/** The data outputStream used for value serializer, which should be 
thread-safe. */
+final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which should be 
thread-safe. */
+final ThreadLocal valueDeserializerView;
+
+public ForStDBBunchPutRequest(
+ContextKey key, Map value, ForStMapState table, 
InternalStateFuture future) {
+super(key, value, table, future);
+Preconditions.checkArgument(table instanceof ForStMapState);

Review Comment:
   this check seems redundant.



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -55,22 +54,44 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
 public CompletableFuture process() {
 return CompletableFuture.runAsync(
 () -> {
-try (WriteBatch writeBatch =
-new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+try (ForStDBWriteBatchWrapper writeBatch =
+new ForStDBWriteBatchWrapper(db, writeOptions, 
batchRequest.size())) {
 for (ForStDBPutRequest request : batchRequest) {
+ColumnFamilyHandle cf = 
request.getColumnFamilyHandle();
 if (request.valueIsNull()) {
-// put(key, null) == delete(key)
-writeBatch.delete(
-request.getColumnFamilyHandle(),
-request.buildSerializedKey());
+if (request instanceof ForStDBBunchPutRequest) 
{
+ForStDBBunchPutRequest bunchPutRequest =
+(ForStDBBunchPutRequest) 
request;
+byte[] primaryKey = 
bunchPutRequest.buildSerializedKey(null);

Review Comment:
   Sorry, I cannot understand this line, why serialize `null` here? Why the 
`Bunch remove` and `Single remove` are using different serialization method.



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.s

Re: [PR] [FLINK-34123][core][type] Introduce built-in serialization support for map and lists [flink]

2024-05-22 Thread via GitHub


reswqa commented on PR #24634:
URL: https://github.com/apache/flink/pull/24634#issuecomment-2126264511

   Thanks @X-czh. Let's wait for the CI to pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34123][core][type] Introduce built-in serialization support for map and lists [flink]

2024-05-22 Thread via GitHub


reswqa commented on code in PR #24634:
URL: https://github.com/apache/flink/pull/24634#discussion_r1610998637


##
docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md:
##
@@ -37,15 +37,16 @@ Flink places some restrictions on the type of elements that 
can be in a DataStre
 The reason for this is that the system analyzes the types to determine
 efficient execution strategies.
 
-There are seven different categories of data types:
+There are eight different categories of data types:
 
 1. **Java Tuples** and **Scala Case Classes**
 2. **Java POJOs**
 3. **Primitive Types**
-4. **Regular Classes**
-5. **Values**
-6. **Hadoop Writables**
-7. **Special Types**
+4. **Common Collection Types**

Review Comment:
   Oops, I didn't notice that. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-05-22 Thread via GitHub


GOODBOY008 commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1610997416


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##
@@ -98,16 +108,17 @@ static  CsvBulkWriter forPojo(Class 
pojoClass, FSDataOutputStr
 @Override
 public void addElement(T element) throws IOException {
 final R r = converter.convert(element, converterContext);
-csvWriter.writeValue(stream, r);
+csvWriter.writeValue(generator, r);
 }
 
 @Override
 public void flush() throws IOException {
-stream.flush();
+generator.flush();
 }
 
 @Override
 public void finish() throws IOException {
+generator.close();

Review Comment:
   I will add unit test to verify the desired behavior. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610987543


##
flink-connector-aws-base/pom.xml:
##
@@ -76,6 +76,12 @@ under the License.
 test
 
 
+

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610987837


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * SqsSink}. More details on the internals of this sink writer may be found in 
{@link
+ * AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+SqsExceptionClassifiers
+.getResourceNotFoundExceptionClassifier(),
+
SqsExceptionClassifiers.getAccessDeniedEx

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610983272


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/testutils/SqsTestUtils.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the 
Localstack container.
+ */
+public class SqsTestUtils {
+
+private static final ObjectMapper MAPPER = createObjectMapper();
+
+public static SqsClient createSqsClient(String endpoint, SdkHttpClient 
httpClient) {
+return AWSServicesTestUtils.createAwsSyncClient(endpoint, httpClient, 
SqsClient.builder());
+}
+
+public static DataStream getSampleDataGenerator(
+StreamExecutionEnvironment env, int endValue) {
+return env.fromSequence(1, endValue)
+.map(Object::toString)
+.returns(String.class)
+.map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", 
data)));
+}
+
+public static List getSampleData(int endValue) throws 
JsonProcessingException {
+List expectedElements = new ArrayList<>();
+for (int i = 1; i <= endValue; i++) {
+expectedElements.add(
+MAPPER.writeValueAsString(ImmutableMap.of("data", 
String.valueOf(i;
+}
+return expectedElements;
+}
+
+private static ObjectMapper createObjectMapper() {
+ObjectMapper objectMapper = new ObjectMapper();
+registerModules(objectMapper);
+return objectMapper;
+}
+
+private static void registerModules(ObjectMapper mapper) {
+mapper.registerModule(new JavaTimeModule())

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610974483


##
flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java:
##
@@ -114,6 +116,14 @@ public static void createBucket(S3Client s3Client, String 
bucketName) {
 }
 }
 
+public static void createSqs(String sqsName, SqsClient sqsClient) {

Review Comment:
   Moved to SqsTestUtils



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35411) Optimize wait logic in draining of async state requests

2024-05-22 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-35411:
---

Assignee: Yanfei Lei  (was: Zakelly Lan)

> Optimize wait logic in draining of async state requests
> ---
>
> Key: FLINK-35411
> URL: https://issues.apache.org/jira/browse/FLINK-35411
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends, Runtime / Task
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>
> Currently during draining of async state requests, the task thread performs 
> {{Thread.sleep}} to avoid cpu overhead when polling mails. This can be 
> optimized by wait & notify.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35412) Batch execution of async state request callback

2024-05-22 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-35412:
---

Assignee: Zakelly Lan

> Batch execution of async state request callback
> ---
>
> Key: FLINK-35412
> URL: https://issues.apache.org/jira/browse/FLINK-35412
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends, Runtime / Task
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> There is one mail for each callback when async state result returns. One 
> possible optimization is to encapsulate multiple callbacks into one mail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35411) Optimize wait logic in draining of async state requests

2024-05-22 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848812#comment-17848812
 ] 

Zakelly Lan commented on FLINK-35411:
-

[~spoon-lz] Actually this is found by some our internal benchmarks and [~Yanfei 
Lei]  have made a draft resolving this, still validating I forgot to assign 
this, sorry about that. And we have more work to do which I think maybe you are 
interested in getting involved. 

> Optimize wait logic in draining of async state requests
> ---
>
> Key: FLINK-35411
> URL: https://issues.apache.org/jira/browse/FLINK-35411
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends, Runtime / Task
>Reporter: Zakelly Lan
>Priority: Major
>
> Currently during draining of async state requests, the task thread performs 
> {{Thread.sleep}} to avoid cpu overhead when polling mails. This can be 
> optimized by wait & notify.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35411) Optimize wait logic in draining of async state requests

2024-05-22 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-35411:
---

Assignee: Zakelly Lan

> Optimize wait logic in draining of async state requests
> ---
>
> Key: FLINK-35411
> URL: https://issues.apache.org/jira/browse/FLINK-35411
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends, Runtime / Task
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> Currently during draining of async state requests, the task thread performs 
> {{Thread.sleep}} to avoid cpu overhead when polling mails. This can be 
> optimized by wait & notify.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35355][State] Internal async aggregating state and corresponding state descriptor [flink]

2024-05-22 Thread via GitHub


Zakelly commented on PR #24810:
URL: https://github.com/apache/flink/pull/24810#issuecomment-2126218813

   @jectpro7 Seems the author of this commit is different from the committer? 
Do you mind change that? Or I can merge this as it is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34123][core][type] Introduce built-in serialization support for map and lists [flink]

2024-05-22 Thread via GitHub


X-czh commented on PR #24634:
URL: https://github.com/apache/flink/pull/24634#issuecomment-2126217002

   @reswqa Thanks for the review. I've rebased on latest master branch, and it 
has been highlighted in the doc content as follows:
   > Currently, only `Map`, `List` and its super interface `Collection` are 
supported.
   
   Would you mind taking a look again when you have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-34914) FLIP-436: Introduce Catalog-related Syntax

2024-05-22 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-34914:
--

Assignee: Yubin Li

> FLIP-436: Introduce Catalog-related Syntax
> --
>
> Key: FLINK-34914
> URL: https://issues.apache.org/jira/browse/FLINK-34914
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>
> Umbrella issue for: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-436%3A+Introduce+Catalog-related+Syntax



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is closed [flink]

2024-05-22 Thread via GitHub


masteryhx closed pull request #24768: [FLINK-35047][state] Shutdown 
StateExecutors when ForStKeyedStateBackend is closed
URL: https://github.com/apache/flink/pull/24768


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610935419


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/testutils/SqsTestUtils.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the 
Localstack container.
+ */
+public class SqsTestUtils {
+
+private static final ObjectMapper MAPPER = createObjectMapper();
+
+public static SqsClient createSqsClient(String endpoint, SdkHttpClient 
httpClient) {
+return AWSServicesTestUtils.createAwsSyncClient(endpoint, httpClient, 
SqsClient.builder());
+}
+
+public static DataStream getSampleDataGenerator(
+StreamExecutionEnvironment env, int endValue) {
+return env.fromSequence(1, endValue)
+.map(Object::toString)
+.returns(String.class)
+.map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", 
data)));
+}
+
+public static List getSampleData(int endValue) throws 
JsonProcessingException {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610935001


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {

Review Comment:
   add new test case - shouldClassifySocketTimeoutExceptionAsNonFatal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-34123) Introduce built-in serialization support for Map and List

2024-05-22 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-34123:
--

Assignee: Zhanghao Chen

> Introduce built-in serialization support for Map and List
> -
>
> Key: FLINK-34123
> URL: https://issues.apache.org/jira/browse/FLINK-34123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Introduce built-in serialization support for Map and List, two common 
> collection types for which Flink already have custom serializers implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610929881


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+private final FatalExceptionClassifier classifier =
+FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+@Test
+public void shouldClassifyNotAuthorizedAsFatal() {
+AwsServiceException sqsException =
+SqsException.builder()
+.awsErrorDetails(
+
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+.build();
+
+// isFatal returns `true` if an exception is non-fatal
+assertFalse(classifier.isFatal(sqsException, ex -> {}));

Review Comment:
   based on isFatal["isFatal returns true if an exception is non-fatal"] 
implementation, we have assertFalse here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610929610


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+private final FatalExceptionClassifier classifier =
+FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+@Test
+public void shouldClassifyNotAuthorizedAsFatal() {
+AwsServiceException sqsException =
+SqsException.builder()
+.awsErrorDetails(
+
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+.build();
+
+// isFatal returns `true` if an exception is non-fatal

Review Comment:
   I agree with you that it is confusing, but I am not sure why isFatal is 
implemented like this. I referred 
[this](https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java#L27)
 also which says "isFatal returns `true` if an exception is non-fatal"
   
   Also I refer these test cases from existing firehose [test 
cases](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34123][core][type] Introduce built-in serialization support for map and lists [flink]

2024-05-22 Thread via GitHub


reswqa commented on code in PR #24634:
URL: https://github.com/apache/flink/pull/24634#discussion_r1610929345


##
docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md:
##
@@ -37,15 +37,16 @@ Flink places some restrictions on the type of elements that 
can be in a DataStre
 The reason for this is that the system analyzes the types to determine
 efficient execution strategies.
 
-There are seven different categories of data types:
+There are eight different categories of data types:
 
 1. **Java Tuples** and **Scala Case Classes**
 2. **Java POJOs**
 3. **Primitive Types**
-4. **Regular Classes**
-5. **Values**
-6. **Hadoop Writables**
-7. **Special Types**
+4. **Common Collection Types**

Review Comment:
   > I also plan to add support for Set later. Would you think it better to 
organize them under Special Types?
   
   I think `Common Collection Types` is fair enough, but we should highlight in 
the content that we are not support `Set` now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610928629


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsSinkWriterTest.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.concurrent.CompletionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Covers construction, defaults and sanity checking of {@link 
SqsSinkWriter}. */
+public class SqsSinkWriterTest {
+
+private SqsSinkWriter sinkWriter;
+
+private static final ElementConverter ELEMENT_CONVERTER_PLACEHOLDER =
+SqsSinkElementConverter.builder()
+.setSerializationSchema(new SimpleStringSchema())
+.build();
+
+@BeforeEach
+void setup() throws IOException {
+TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+Properties sinkProperties = 
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint";);
+SqsSink sink =
+new SqsSink<>(
+ELEMENT_CONVERTER_PLACEHOLDER,
+50,
+16,
+1,
+4 * 1024 * 1024L,
+5000L,
+1000 * 1024L,
+true,
+"sqsUrl",
+sinkProperties);
+sinkWriter = (SqsSinkWriter) 
sink.createWriter(sinkInitContext);
+}
+
+@Test
+void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
+String testString = "{many hands make light work;";
+SendMessageBatchRequestEntry record = 
SendMessageBatchRequestEntry.builder().messageBody(testString).build();
+assertThat(sinkWriter.getSizeInBytes(record))
+
.isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);

Review Comment:
   oops, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35427) Support fail-on-unknown-field config in json format

2024-05-22 Thread huhuan (Jira)
huhuan created FLINK-35427:
--

 Summary: Support fail-on-unknown-field config in json format
 Key: FLINK-35427
 URL: https://issues.apache.org/jira/browse/FLINK-35427
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: huhuan


In many cases, the consumer and producer of message queues come from different 
teams, or even different companies.
As message consumer, sometimes it is difficult to subscribe updates on message 
format, which may result in data loss.
We want to ensure the message format strictly matches the schema, if some field 
is missing or new field is added, it is better to fail the application so that 
we can notice and fix it quickly.
In this case, a fail-on-unknown-field config for JSON format could be very 
helpful, especially when works with fail-on-missing-field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-05-22 Thread via GitHub


robobario commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1610925700


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##
@@ -98,16 +108,17 @@ static  CsvBulkWriter forPojo(Class 
pojoClass, FSDataOutputStr
 @Override
 public void addElement(T element) throws IOException {
 final R r = converter.convert(element, converterContext);
-csvWriter.writeValue(stream, r);
+csvWriter.writeValue(generator, r);
 }
 
 @Override
 public void flush() throws IOException {
-stream.flush();
+generator.flush();
 }
 
 @Override
 public void finish() throws IOException {
+generator.close();

Review Comment:
   The CsvGenerator [honours 
AUTO_CLOSE_TARGET](https://github.com/FasterXML/jackson-dataformats-text/blob/3d3165e58b90618a5fbccf630f1604a383afe78c/csv/src/main/java/com/fasterxml/jackson/dataformat/csv/CsvGenerator.java#L504)
 so generator.close() will flush the underlying `Writer` but not close it.
   
   It would be great to have unit tests proving this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35426][table-planner] Change the distribution of DynamicFilteringDataCollector to Broadcast [flink]

2024-05-22 Thread via GitHub


flinkbot commented on PR #24830:
URL: https://github.com/apache/flink/pull/24830#issuecomment-2126172102

   
   ## CI report:
   
   * fdeb2381c152dc3339779d645717ff9099664011 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35426) Change the distribution of DynamicFilteringDataCollector to Broadcast

2024-05-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35426:
---
Labels: pull-request-available  (was: )

> Change the distribution of DynamicFilteringDataCollector to Broadcast
> -
>
> Key: FLINK-35426
> URL: https://issues.apache.org/jira/browse/FLINK-35426
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Currently, the DynamicFilteringDataCollector is utilized in the dynamic 
> partition pruning feature of batch jobs to collect the partition information 
> dynamically filtered by the source. Its current data distribution method is 
> rebalance, and it also acts as an upstream vertex to the probe side Source.
> Presently, when the Scheduler dynamically infers the parallelism for vertices 
> that are both downstream and Source, it considers factors from both sides, 
> which can lead to an overestimation of parallelism due to 
> DynamicFilteringDataCollector being an upstream of the Source. We aim to 
> change the distribution method of the DynamicFilteringDataCollector to 
> broadcast to prevent the dynamic overestimation of Source parallelism.
> Furthermore, given that the DynamicFilteringDataCollector transmits data 
> through the OperatorCoordinator rather than through normal data distribution, 
> this change will not affect the DPP (Dynamic Partition Pruning) functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35426][table-planner] Change the distribution of DynamicFilteringDataCollector to Broadcast [flink]

2024-05-22 Thread via GitHub


SinBex opened a new pull request, #24830:
URL: https://github.com/apache/flink/pull/24830

   ## What is the purpose of the change
   
   Currently, the DynamicFilteringDataCollector is utilized in the dynamic 
partition pruning feature of batch jobs to collect the partition information 
dynamically filtered by the source. Its current data distribution method is 
rebalance, and it also acts as an upstream vertex to the probe side Source.
   
   Presently, when the Scheduler dynamically infers the parallelism for 
vertices that are both downstream and Source, it considers factors from both 
sides, which can lead to an overestimation of parallelism due to 
DynamicFilteringDataCollector being an upstream of the Source. We aim to change 
the distribution method of the DynamicFilteringDataCollector to broadcast to 
prevent the dynamic overestimation of Source parallelism.
   
   Furthermore, given that the DynamicFilteringDataCollector transmits data 
through the OperatorCoordinator rather than through normal data distribution, 
this change will not affect the DPP (Dynamic Partition Pruning) functionality.
   
   
   ## Brief change log
   
 - *Change the distribution of DynamicFilteringDataCollector to Broadcast*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): ( no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610910711


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsStateSerializer.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/** SQS implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class SqsStateSerializer extends 
AsyncSinkWriterStateSerializer {
+@Override
+protected void serializeRequestToStream(final SendMessageBatchRequestEntry 
request, final DataOutputStream out)
+throws IOException
+{
+out.write(request.messageBody().getBytes(StandardCharsets.UTF_8));
+}
+
+@Override
+protected SendMessageBatchRequestEntry deserializeRequestFromStream(final 
long requestSize, final DataInputStream in)
+throws IOException
+{
+final byte[] requestData = new byte[(int) requestSize];
+in.read(requestData);
+return SendMessageBatchRequestEntry.builder().messageBody(new 
String(requestData, StandardCharsets.UTF_8)).build();

Review Comment:
   yes, test case updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35421) Schema Operator blocking forever when Akka Rpc timeout

2024-05-22 Thread hk__lrzy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hk__lrzy updated FLINK-35421:
-
Description: When SchemaOperator restart and there have no checkpoint 
before, SchemaOperator will not send *RefreshPendingListsRequest* to 
coordinator, and if coordinator have pending schema events and SchemaOperator 
will block forever.  (was: When SchemaOperator restart and there have no 
checkpoint before, SchemaOperator will not send `RefreshPendingListsRequest` to 
coordinator, and if coordinator have pending schema events and SchemaOperator 
will block forever.)

> Schema Operator blocking forever when Akka Rpc timeout
> --
>
> Key: FLINK-35421
> URL: https://issues.apache.org/jira/browse/FLINK-35421
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: hk__lrzy
>Priority: Major
>  Labels: pull-request-available
>
> When SchemaOperator restart and there have no checkpoint before, 
> SchemaOperator will not send *RefreshPendingListsRequest* to coordinator, and 
> if coordinator have pending schema events and SchemaOperator will block 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35426) Change the distribution of DynamicFilteringDataCollector to Broadcast

2024-05-22 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848801#comment-17848801
 ] 

Zhu Zhu commented on FLINK-35426:
-

Good point! [~xiasun]
The task is assigned to you. Feel free to open a pr for it.

> Change the distribution of DynamicFilteringDataCollector to Broadcast
> -
>
> Key: FLINK-35426
> URL: https://issues.apache.org/jira/browse/FLINK-35426
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
> Fix For: 1.20.0
>
>
> Currently, the DynamicFilteringDataCollector is utilized in the dynamic 
> partition pruning feature of batch jobs to collect the partition information 
> dynamically filtered by the source. Its current data distribution method is 
> rebalance, and it also acts as an upstream vertex to the probe side Source.
> Presently, when the Scheduler dynamically infers the parallelism for vertices 
> that are both downstream and Source, it considers factors from both sides, 
> which can lead to an overestimation of parallelism due to 
> DynamicFilteringDataCollector being an upstream of the Source. We aim to 
> change the distribution method of the DynamicFilteringDataCollector to 
> broadcast to prevent the dynamic overestimation of Source parallelism.
> Furthermore, given that the DynamicFilteringDataCollector transmits data 
> through the OperatorCoordinator rather than through normal data distribution, 
> this change will not affect the DPP (Dynamic Partition Pruning) functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35426) Change the distribution of DynamicFilteringDataCollector to Broadcast

2024-05-22 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-35426:
---

Assignee: xingbe

> Change the distribution of DynamicFilteringDataCollector to Broadcast
> -
>
> Key: FLINK-35426
> URL: https://issues.apache.org/jira/browse/FLINK-35426
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
> Fix For: 1.20.0
>
>
> Currently, the DynamicFilteringDataCollector is utilized in the dynamic 
> partition pruning feature of batch jobs to collect the partition information 
> dynamically filtered by the source. Its current data distribution method is 
> rebalance, and it also acts as an upstream vertex to the probe side Source.
> Presently, when the Scheduler dynamically infers the parallelism for vertices 
> that are both downstream and Source, it considers factors from both sides, 
> which can lead to an overestimation of parallelism due to 
> DynamicFilteringDataCollector being an upstream of the Source. We aim to 
> change the distribution method of the DynamicFilteringDataCollector to 
> broadcast to prevent the dynamic overestimation of Source parallelism.
> Furthermore, given that the DynamicFilteringDataCollector transmits data 
> through the OperatorCoordinator rather than through normal data distribution, 
> this change will not affect the DPP (Dynamic Partition Pruning) functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-22 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2126150641

   fix conflicts, @twalthr @dawidwys @snuyanzin and will you help review this 
pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-22 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2126150525

   Conclusion:
   Since there are no objections, then we will support it with deduplication 
semantics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35426) Change the distribution of DynamicFilteringDataCollector to Broadcast

2024-05-22 Thread xingbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848800#comment-17848800
 ] 

xingbe commented on FLINK-35426:


[~zhuzh] Could you please assign this ticket to me? Thanks.

> Change the distribution of DynamicFilteringDataCollector to Broadcast
> -
>
> Key: FLINK-35426
> URL: https://issues.apache.org/jira/browse/FLINK-35426
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: xingbe
>Priority: Major
> Fix For: 1.20.0
>
>
> Currently, the DynamicFilteringDataCollector is utilized in the dynamic 
> partition pruning feature of batch jobs to collect the partition information 
> dynamically filtered by the source. Its current data distribution method is 
> rebalance, and it also acts as an upstream vertex to the probe side Source.
> Presently, when the Scheduler dynamically infers the parallelism for vertices 
> that are both downstream and Source, it considers factors from both sides, 
> which can lead to an overestimation of parallelism due to 
> DynamicFilteringDataCollector being an upstream of the Source. We aim to 
> change the distribution method of the DynamicFilteringDataCollector to 
> broadcast to prevent the dynamic overestimation of Source parallelism.
> Furthermore, given that the DynamicFilteringDataCollector transmits data 
> through the OperatorCoordinator rather than through normal data distribution, 
> this change will not affect the DPP (Dynamic Partition Pruning) functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35426) Change the distribution of DynamicFilteringDataCollector to Broadcast

2024-05-22 Thread xingbe (Jira)
xingbe created FLINK-35426:
--

 Summary: Change the distribution of DynamicFilteringDataCollector 
to Broadcast
 Key: FLINK-35426
 URL: https://issues.apache.org/jira/browse/FLINK-35426
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: xingbe
 Fix For: 1.20.0


Currently, the DynamicFilteringDataCollector is utilized in the dynamic 
partition pruning feature of batch jobs to collect the partition information 
dynamically filtered by the source. Its current data distribution method is 
rebalance, and it also acts as an upstream vertex to the probe side Source.

Presently, when the Scheduler dynamically infers the parallelism for vertices 
that are both downstream and Source, it considers factors from both sides, 
which can lead to an overestimation of parallelism due to 
DynamicFilteringDataCollector being an upstream of the Source. We aim to change 
the distribution method of the DynamicFilteringDataCollector to broadcast to 
prevent the dynamic overestimation of Source parallelism.

Furthermore, given that the DynamicFilteringDataCollector transmits data 
through the OperatorCoordinator rather than through normal data distribution, 
this change will not affect the DPP (Dynamic Partition Pruning) functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-22 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1609545011


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Earlier, I misjudged the error of not noticing the negation in the 
condition. 
   But by substituting `anyMatch` for `allMatch`, you can see that the test 
will stop passing.
   Also I changed gateIdx from random generation to always zero (in 
`StateHandleDummyUtil#createNewInputChannelStateHandle`). This change have no 
affects for other tests. But I need zero for this test, because need stability 
for the number of input gates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610896956


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsSinkITCase.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Integration test suite for the {@code SqsSink} using a localstack container.
+ */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+class SqsSinkITCase {

Review Comment:
   Created "flink-connector-aws-sqs-e2e-tests" under 
"flink-connector-aws-e2e-tests" and move this test there, but I am really 
struggling with how to run that test there. Running local-stack not helping 
there and I tried to follow the readme given 
[here](https://github.com/apache/flink-connector-aws/tree/main/flink-connector-aws-e2e-tests).
 Downloaded flink-dist-1.19.0.jar and pasted under 
"flink-connector-aws-e2e-tests" folder and ran "mvn clean verify 
-Prun-end-to-end-tests -DdistDir=flink-dist-1.19.0.jar" , it is getting failed 
with error [screenshot attached]. any documentation on how to easily run these 
test will be super helpful . thanks!
   
   https://github.com/apache/flink-connector-aws/assets/169495197/406cae7a-5eae-421f-8e11-cae88051f78f";>
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35355][State] Internal async aggregating state and corresponding state descriptor [flink]

2024-05-22 Thread via GitHub


jectpro7 commented on PR #24810:
URL: https://github.com/apache/flink/pull/24810#issuecomment-2126086682

   Hi @Zakelly, I've addressed comments, please help double review. Thanks in 
advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35425) Support convert Freshness to cron expression for full refresh mode

2024-05-22 Thread dalongliu (Jira)
dalongliu created FLINK-35425:
-

 Summary: Support convert Freshness to cron expression for full 
refresh mode 
 Key: FLINK-35425
 URL: https://issues.apache.org/jira/browse/FLINK-35425
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Gateway, Table SQL / Planner
Affects Versions: 1.20.0
Reporter: dalongliu
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35421) Schema Operator blocking forever when Akka Rpc timeout

2024-05-22 Thread hk__lrzy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848793#comment-17848793
 ] 

hk__lrzy commented on FLINK-35421:
--

[~kunni]  Thank your for relay,  i already have a  pull request to fix the 
issue, maybe you can assign the Jira to me and review the PR.

> Schema Operator blocking forever when Akka Rpc timeout
> --
>
> Key: FLINK-35421
> URL: https://issues.apache.org/jira/browse/FLINK-35421
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: hk__lrzy
>Priority: Major
>  Labels: pull-request-available
>
> When SchemaOperator restart and there have no checkpoint before, 
> SchemaOperator will not send `RefreshPendingListsRequest` to coordinator, and 
> if coordinator have pending schema events and SchemaOperator will block 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35421) Schema Operator blocking forever when Akka Rpc timeout

2024-05-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35421:
---
Labels: pull-request-available  (was: )

> Schema Operator blocking forever when Akka Rpc timeout
> --
>
> Key: FLINK-35421
> URL: https://issues.apache.org/jira/browse/FLINK-35421
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: hk__lrzy
>Priority: Major
>  Labels: pull-request-available
>
> When SchemaOperator restart and there have no checkpoint before, 
> SchemaOperator will not send `RefreshPendingListsRequest` to coordinator, and 
> if coordinator have pending schema events and SchemaOperator will block 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35421]fixed schema operator blocking when restart [flink-cdc]

2024-05-22 Thread via GitHub


hk-lrzy opened a new pull request, #3350:
URL: https://github.com/apache/flink-cdc/pull/3350

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-22 Thread via GitHub


superdiaodiao commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1610884892


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */
+@Internal
+public class UrlDecodeFunction extends BuiltInScalarFunction {
+
+public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_DECODE, context);
+}
+
+public @Nullable StringData eval(StringData value) {
+final Charset charset = StandardCharsets.UTF_8;
+try {
+return StringData.fromString(URLDecoder.decode(value.toString(), 
charset.name()));
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(
+"Failed to decode value: " + value + " with charset: " + 
charset.name(), e);
+} catch (RuntimeException e) {
+return value;
+}

Review Comment:
   ok, I will make it throwable and add test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31223) sql-client.sh fails to start with ssl enabled

2024-05-22 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840864#comment-17840864
 ] 

Weijie Guo edited comment on FLINK-31223 at 5/23/24 2:17 AM:
-

master(1.20) via 259aef0b66b3e55013f02ac31d4cff61202b78c5.
release-1.19 via a5efed2c06c42186e376dd2ede18dc09986c0387.


was (Author: weijie guo):
master(1.20) via 259aef0b66b3e55013f02ac31d4cff61202b78c5.

> sql-client.sh fails to start with ssl enabled
> -
>
> Key: FLINK-31223
> URL: https://issues.apache.org/jira/browse/FLINK-31223
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: macdoor615
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.20.0
>
>
> *Version:* 1.17-SNAPSHOT *Commit:* c66ef25 
> 1. ssl disabled 
> sql-client.sh works properly
> 2. ssl enabled
> web ui can access with [https://url|https://url/]
> The task can be submitted correctly through sql-gateway. I can confirm that 
> sql-gateway exposes the http protocol, not https.
> But sql-client.sh fails to start with the following exceptions. It seems that 
> sql-client.sh expect https protocol
>  
> {code:java}
> 2023-02-25 14:43:19,317 INFO  org.apache.flink.configuration.Configuration    
>              [] - Config uses fallback configuration key 'rest.port' instead 
> of key 'rest.bind-port'
> 2023-02-25 14:43:19,343 INFO  
> org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Starting 
> rest endpoint.
> 2023-02-25 14:43:19,713 INFO  
> org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Rest 
> endpoint listening at localhost:44922
> 2023-02-25 14:43:19,715 INFO  org.apache.flink.table.client.SqlClient         
>              [] - Start embedded gateway on port 44922
> 2023-02-25 14:43:20,040 INFO  
> org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Shutting 
> down rest endpoint.
> 2023-02-25 14:43:20,088 INFO  
> org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Shut down 
> complete.
> 2023-02-25 14:43:20,089 ERROR org.apache.flink.table.client.SqlClient         
>              [] - SQL Client must stop.
> org.apache.flink.table.client.SqlClientException: Failed to create the 
> executor.
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:170)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:113)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.Executor.create(Executor.java:34) 
> ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:110) 
> ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228) 
> [flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179) 
> [flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
> Failed to get response.
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:427)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:416)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.negotiateVersion(ExecutorImpl.java:447)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:132)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         ... 5 more
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: 
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: 
> not an SSL/TLS record: 
> 485454502f312e3120343034204e6f7420466f756e640d0a636f6e74656e742d747970653a206170706c69636174696f6e2f6a736f6e3b20636861727365743d5554462d380d0a6163636573732d636f6e74726f6c2d616c6c6f772d6f726967696e3a202a0d0a636f6e74656e742d6c656e6774683a2033380d0a0d0a7b226572726f7273223a5b224e6f7420666f756e643a202f6261642d72657175657374225d7d
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:489)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SN

[jira] [Updated] (FLINK-31223) sql-client.sh fails to start with ssl enabled

2024-05-22 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-31223:
---
Fix Version/s: 1.19.1

> sql-client.sh fails to start with ssl enabled
> -
>
> Key: FLINK-31223
> URL: https://issues.apache.org/jira/browse/FLINK-31223
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: macdoor615
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.20.0, 1.19.1
>
>
> *Version:* 1.17-SNAPSHOT *Commit:* c66ef25 
> 1. ssl disabled 
> sql-client.sh works properly
> 2. ssl enabled
> web ui can access with [https://url|https://url/]
> The task can be submitted correctly through sql-gateway. I can confirm that 
> sql-gateway exposes the http protocol, not https.
> But sql-client.sh fails to start with the following exceptions. It seems that 
> sql-client.sh expect https protocol
>  
> {code:java}
> 2023-02-25 14:43:19,317 INFO  org.apache.flink.configuration.Configuration    
>              [] - Config uses fallback configuration key 'rest.port' instead 
> of key 'rest.bind-port'
> 2023-02-25 14:43:19,343 INFO  
> org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Starting 
> rest endpoint.
> 2023-02-25 14:43:19,713 INFO  
> org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Rest 
> endpoint listening at localhost:44922
> 2023-02-25 14:43:19,715 INFO  org.apache.flink.table.client.SqlClient         
>              [] - Start embedded gateway on port 44922
> 2023-02-25 14:43:20,040 INFO  
> org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Shutting 
> down rest endpoint.
> 2023-02-25 14:43:20,088 INFO  
> org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Shut down 
> complete.
> 2023-02-25 14:43:20,089 ERROR org.apache.flink.table.client.SqlClient         
>              [] - SQL Client must stop.
> org.apache.flink.table.client.SqlClientException: Failed to create the 
> executor.
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:170)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:113)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.Executor.create(Executor.java:34) 
> ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:110) 
> ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228) 
> [flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179) 
> [flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
> Failed to get response.
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:427)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:416)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.negotiateVersion(ExecutorImpl.java:447)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:132)
>  ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         ... 5 more
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: 
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: 
> not an SSL/TLS record: 
> 485454502f312e3120343034204e6f7420466f756e640d0a636f6e74656e742d747970653a206170706c69636174696f6e2f6a736f6e3b20636861727365743d5554462d380d0a6163636573732d636f6e74726f6c2d616c6c6f772d6f726967696e3a202a0d0a636f6e74656e742d6c656e6774683a2033380d0a0d0a7b226572726f7273223a5b224e6f7420666f756e643a202f6261642d72657175657374225d7d
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:489)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.Abstr

Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to g… [flink]

2024-05-22 Thread via GitHub


reswqa merged PR #24742:
URL: https://github.com/apache/flink/pull/24742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to g… [flink]

2024-05-22 Thread via GitHub


reswqa commented on PR #24742:
URL: https://github.com/apache/flink/pull/24742#issuecomment-2126065522

   Thanks for the backport. Could you squash these two commits, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to [flink]

2024-05-22 Thread via GitHub


reswqa commented on code in PR #24741:
URL: https://github.com/apache/flink/pull/24741#discussion_r1610874737


##
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTestBase.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.client.cli.TerminalUtils;
+
+import org.jline.terminal.Size;
+import org.jline.terminal.Terminal;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+
+/** Base class for test {@link SqlClient}. */
+class SqlClientTestBase {
+@TempDir private Path tempFolder;
+
+protected String historyPath;
+
+protected Map originalEnv;
+
+@BeforeEach
+void before() throws IOException {
+originalEnv = System.getenv();
+
+// prepare conf dir
+File confFolder = Files.createTempDirectory(tempFolder, 
"conf").toFile();
+File confYaml = new File(confFolder, "config.yaml");

Review Comment:
   IIRC, `config.yaml` will not be recognized as a valid flink configuration 
file in 1.18. It should be `flink-conf.yaml`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-22 Thread via GitHub


loserwang1024 commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1610861124


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java:
##
@@ -217,6 +220,11 @@ public JdbcSourceFetchTaskContext 
createFetchTaskContext(JdbcSourceConfig taskSo
 
 @Override
 public void notifyCheckpointComplete(long checkpointId, Offset offset) 
throws Exception {

Review Comment:
   What about do it in 
IncrementalSourceReaderWithCommit#notifyCheckpointComplete and 
PostgresSourceReader#notifyCheckpointComplete. Reader control when and whether 
to commit offset, while dialect just support ability to do it.
   
   And when put into reader, can just use a long rather than AtomicLong. so we 
can set checkpointCount = (checkpointCount+1)%3



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java:
##
@@ -100,7 +102,9 @@ public PostgresSourceConfig create(int subtaskId) {
 
 // The PostgresSource will do snapshot according to its StartupMode.
 // Do not need debezium to do the snapshot work.
-props.put("snapshot.mode", "never");
+props.setProperty("snapshot.mode", "never");
+
+props.setProperty("checkpoint.cycle", String.valueOf(checkpointCycle));

Review Comment:
   I don't know what "checkpoint.cycle" does? Debezium's offet commit cycle? 
Flink cdc have already been responsible for submitting offset, please not let 
Debezium do it again(turn off)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][rest] Add getter methods for SubtasksTimesInfo to get all values easily [flink]

2024-05-22 Thread via GitHub


1996fanrui merged PR #24825:
URL: https://github.com/apache/flink/pull/24825


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][rest] Add getter methods for SubtasksTimesInfo to get all values easily [flink]

2024-05-22 Thread via GitHub


1996fanrui commented on PR #24825:
URL: https://github.com/apache/flink/pull/24825#issuecomment-2126057162

   Thanks @RocMarshal for the quick review, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][rest] Add getter methods for SubtasksTimesInfo to get all values easily [flink]

2024-05-22 Thread via GitHub


1996fanrui merged PR #24824:
URL: https://github.com/apache/flink/pull/24824


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35421) Schema Operator blocking forever when Akka Rpc timeout

2024-05-22 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848788#comment-17848788
 ] 

LvYanquan commented on FLINK-35421:
---

Thanks for pointing out this issue. I am willing to fix it.

> Schema Operator blocking forever when Akka Rpc timeout
> --
>
> Key: FLINK-35421
> URL: https://issues.apache.org/jira/browse/FLINK-35421
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: hk__lrzy
>Priority: Major
>
> When SchemaOperator restart and there have no checkpoint before, 
> SchemaOperator will not send `RefreshPendingListsRequest` to coordinator, and 
> if coordinator have pending schema events and SchemaOperator will block 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35417) JobManager and TaskManager support merging and run in a single process

2024-05-22 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-35417:
-
Component/s: Runtime / Coordination
 (was: API / Core)

> JobManager and TaskManager support merging and run in a single process
> --
>
> Key: FLINK-35417
> URL: https://issues.apache.org/jira/browse/FLINK-35417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: melin
>Assignee: Jeyhun Karimov
>Priority: Major
>
> flink is widely used in data integration scenarios, where a single 
> concurrency is not high, and in many cases a single concurrency can run a 
> task. Consider the high availability, application mode, and large number of 
> JobManger nodes that cost a lot of resources. If the Session mode is used, 
> the stability is not high.
> In application mode, JobManager and TaskManager can be run together to 
> achieve reliability and save resources.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35129) Postgres source commits the offset after every multiple checkpoint cycles.

2024-05-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35129:
---
Labels: pull-request-available  (was: )

> Postgres source commits the offset after every multiple checkpoint cycles.
> --
>
> Key: FLINK-35129
> URL: https://issues.apache.org/jira/browse/FLINK-35129
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Muhammet Orazov
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> After entering the Stream phase, the offset consumed by the global slot is 
> committed upon the completion of each checkpoint, preventing log files from 
> being unable to be recycled continuously, which could lead to insufficient 
> disk space.
> However, the job can only restart from the latest checkpoint or savepoint. if 
> restored from an earlier state, WAL may already have been recycled.
>  
> The way to solve it is to commit the offset after every multiple checkpoint 
> cycles. The number of checkpoint cycles is determine by connector option, and 
> the default value is 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-22 Thread via GitHub


morazow opened a new pull request, #3349:
URL: https://github.com/apache/flink-cdc/pull/3349

   https://issues.apache.org/jira/browse/FLINK-35129
   
   - Adds option for checkpoint cycle parameter
   - Commits PG offsets on every multiple of checkpoint cycle
   - Updates docs with new parameter definition
   - [TODO] Adds unit & integration tests to cover new functionality


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35424) Elasticsearch connector 8 supports SSL context

2024-05-22 Thread Mingliang Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingliang Liu updated FLINK-35424:
--
Description: 
In  FLINK-34369, we added SSL support for the base Elasticsearch sink class 
that is used by both Elasticsearch 6 and 7. The Elasticsearch 8 connector is 
using the AsyncSink API and it does not use the aforementioned base sink class. 
It needs separate change to support this feature.

This is specially important to Elasticsearch 8 which enables secure by default. 
Meanwhile, it merits if we add integration tests for this SSL context support.

  was:In  FLINK-34369, we added SSL support for the base Elasticsearch sink 
class that is used by both Elasticsearch 6 and 7. The Elasticsearch 8 connector 
is using the AsyncSink API and need separate change to support this feature. 
This is specially important to Elasticsearch 8 which enables secure by default. 
Meanwhile, it merits if we add integration tests for this SSL context support.


> Elasticsearch connector 8 supports SSL context
> --
>
> Key: FLINK-35424
> URL: https://issues.apache.org/jira/browse/FLINK-35424
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.17.1
>Reporter: Mingliang Liu
>Assignee: Mingliang Liu
>Priority: Major
>  Labels: pull-request-available
>
> In  FLINK-34369, we added SSL support for the base Elasticsearch sink class 
> that is used by both Elasticsearch 6 and 7. The Elasticsearch 8 connector is 
> using the AsyncSink API and it does not use the aforementioned base sink 
> class. It needs separate change to support this feature.
> This is specially important to Elasticsearch 8 which enables secure by 
> default. Meanwhile, it merits if we add integration tests for this SSL 
> context support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35424) Elasticsearch connector 8 supports SSL context

2024-05-22 Thread Mingliang Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingliang Liu updated FLINK-35424:
--
Description: In  FLINK-34369, we added SSL support for the base 
Elasticsearch sink class that is used by both Elasticsearch 6 and 7. The 
Elasticsearch 8 connector is using the AsyncSink API and need separate change 
to support this feature. This is specially important to Elasticsearch 8 which 
enables secure by default. Meanwhile, it merits if we add integration tests for 
this SSL context support.  (was: In )

> Elasticsearch connector 8 supports SSL context
> --
>
> Key: FLINK-35424
> URL: https://issues.apache.org/jira/browse/FLINK-35424
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.17.1
>Reporter: Mingliang Liu
>Assignee: Mingliang Liu
>Priority: Major
>  Labels: pull-request-available
>
> In  FLINK-34369, we added SSL support for the base Elasticsearch sink class 
> that is used by both Elasticsearch 6 and 7. The Elasticsearch 8 connector is 
> using the AsyncSink API and need separate change to support this feature. 
> This is specially important to Elasticsearch 8 which enables secure by 
> default. Meanwhile, it merits if we add integration tests for this SSL 
> context support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35424) Elasticsearch connector 8 supports SSL context

2024-05-22 Thread Mingliang Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingliang Liu updated FLINK-35424:
--
Description: In   (was: The current Flink ElasticSearch connector does not 
support SSL option, causing issues connecting to secure ES clusters.

As SSLContext is not serializable and possibly environment aware, we can add a 
(serializable) provider of SSL context to the {{NetworkClientConfig}}.)

> Elasticsearch connector 8 supports SSL context
> --
>
> Key: FLINK-35424
> URL: https://issues.apache.org/jira/browse/FLINK-35424
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.17.1
>Reporter: Mingliang Liu
>Assignee: Mingliang Liu
>Priority: Major
>  Labels: pull-request-available
>
> In 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35424) Elasticsearch connector 8 supports SSL context

2024-05-22 Thread Mingliang Liu (Jira)
Mingliang Liu created FLINK-35424:
-

 Summary: Elasticsearch connector 8 supports SSL context
 Key: FLINK-35424
 URL: https://issues.apache.org/jira/browse/FLINK-35424
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Affects Versions: 1.17.1
Reporter: Mingliang Liu
Assignee: Mingliang Liu


The current Flink ElasticSearch connector does not support SSL option, causing 
issues connecting to secure ES clusters.

As SSLContext is not serializable and possibly environment aware, we can add a 
(serializable) provider of SSL context to the {{NetworkClientConfig}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-22 Thread via GitHub


snuyanzin commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1610775137


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */
+@Internal
+public class UrlDecodeFunction extends BuiltInScalarFunction {
+
+public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_DECODE, context);
+}
+
+public @Nullable StringData eval(StringData value) {
+final Charset charset = StandardCharsets.UTF_8;
+try {
+return StringData.fromString(URLDecoder.decode(value.toString(), 
charset.name()));
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(
+"Failed to decode value: " + value + " with charset: " + 
charset.name(), e);
+} catch (RuntimeException e) {
+return value;
+}

Review Comment:
   I would follow same approach as Spark does otherwise it's impossible to say 
whether the function failed or succeeded... 
   Also we need a test for this case which seems to be absent ... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove duplicated word [flink]

2024-05-22 Thread via GitHub


flinkbot commented on PR #24829:
URL: https://github.com/apache/flink/pull/24829#issuecomment-212581

   
   ## CI report:
   
   * b0bd48bd8861803713456e8b8e3ca13be7d01ec1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] remove duplicated word [flink]

2024-05-22 Thread via GitHub


naferx opened a new pull request, #24829:
URL: https://github.com/apache/flink/pull/24829

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35423][table] ARRAY_EXCEPT should follow set semantics [flink]

2024-05-22 Thread via GitHub


flinkbot commented on PR #24828:
URL: https://github.com/apache/flink/pull/24828#issuecomment-2125833919

   
   ## CI report:
   
   * cebfd0b8fa496b5cc97280c044352a71dbd75b1e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35423) ARRAY_EXCEPT should support set semantics

2024-05-22 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-35423:
---

Assignee: Sergey Nuyanzin

> ARRAY_EXCEPT should support set semantics
> -
>
> Key: FLINK-35423
> URL: https://issues.apache.org/jira/browse/FLINK-35423
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Blocker
>  Labels: pull-request-available
>
> After a number of discussions e.g. here [1]
> It was decided to follow set semantics for {{ARRAY_EXCEPT}} and 
> {{ARRAY_INTERSECT}}.
> It is marked as a blocker since {{ARRAY_EXCEPT}} was added in 1.20 only and 
> has not been released yet, so the change should be done before 1.20.0 release 
> to avoid inconsistencies.
> [1] https://github.com/apache/flink/pull/24526



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35423) ARRAY_EXCEPT should support set semantics

2024-05-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35423:
---
Labels: pull-request-available  (was: )

> ARRAY_EXCEPT should support set semantics
> -
>
> Key: FLINK-35423
> URL: https://issues.apache.org/jira/browse/FLINK-35423
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Priority: Blocker
>  Labels: pull-request-available
>
> After a number of discussions e.g. here [1]
> It was decided to follow set semantics for {{ARRAY_EXCEPT}} and 
> {{ARRAY_INTERSECT}}.
> It is marked as a blocker since {{ARRAY_EXCEPT}} was added in 1.20 only and 
> has not been released yet, so the change should be done before 1.20.0 release 
> to avoid inconsistencies.
> [1] https://github.com/apache/flink/pull/24526



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35423][table] ARRAY_EXCEPT should follow set semantics [flink]

2024-05-22 Thread via GitHub


snuyanzin opened a new pull request, #24828:
URL: https://github.com/apache/flink/pull/24828

   
   
   ## What is the purpose of the change
   
   The idea of the PR is making of `ARRAY_EXCEPT` following set semantics
   
   ## Brief change log
   
   
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
   
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java
   
   ## Verifying this change
   
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row/map/array type [flink]

2024-05-22 Thread via GitHub


ViktorCosenza commented on PR #24795:
URL: https://github.com/apache/flink/pull/24795#issuecomment-2125705836

   I see, Ive got the impression that they were forgotten, not purposely left
   out because no tests covered writing nested structures ( if there were, the
   tests would fail and the methods would have been implemented before)
   
   Did you got the impression it wasn’t added on purpose? I could add more
   tests if you think it would help
   
   
   On Wed, 22 May 2024 at 17:26 Jing Ge ***@***.***> wrote:
   
   > It looks like those methods were skipped on purpose in #17542
   >  and #17542 (comment)
   > 
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row/map/array type [flink]

2024-05-22 Thread via GitHub


JingGe commented on PR #24795:
URL: https://github.com/apache/flink/pull/24795#issuecomment-2125686671

   It looks like those methods were skipped on purpose in #17542 and 
https://github.com/apache/flink/pull/17542#issuecomment-1954552466


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row/map/array type [flink]

2024-05-22 Thread via GitHub


ViktorCosenza commented on PR #24795:
URL: https://github.com/apache/flink/pull/24795#issuecomment-2125670540

   > > Do you have any hints about the background info?
   > 
   > Not really, I found out this issue because we were trying to save Parquet 
files to S3 and the writer would fail due to empty array being invalid in 
Parquet. When stepping in the debugger I realized that the methods were empty.
   > 
   > After digging a bit i found 2 PRs here that jointly solved the problem, 
but both inactive. I simply created this PR merging both of them to facilitate 
the review/merge process
   
   Also, as @xccui mentioned, there is a very similar issue/code with methods 
missing on apache-paimon, so maybe this code was copied around a bit without 
these methods


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row/map/array type [flink]

2024-05-22 Thread via GitHub


JingGe commented on code in PR #24795:
URL: https://github.com/apache/flink/pull/24795#discussion_r1610586619


##
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java:
##
@@ -381,9 +381,16 @@ private MapWriter(LogicalType keyType, LogicalType 
valueType, GroupType groupTyp
 
 @Override
 public void write(RowData row, int ordinal) {
-recordConsumer.startGroup();
+writeMapData(row.getMap(ordinal));
+}
 
-MapData mapData = row.getMap(ordinal);
+@Override
+public void write(ArrayData arrayData, int ordinal) {

Review Comment:
   NIT: Not sure why those write(ArrayData arrayData, int ordinal) methods 
didn't get implemented. It should be trivial to do it while building those 
Writers. Do you have any hints about the background info?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row/map/array type [flink]

2024-05-22 Thread via GitHub


ViktorCosenza commented on PR #24795:
URL: https://github.com/apache/flink/pull/24795#issuecomment-2125663861

   > Do you have any hints about the background info?
   
   Not really, I found out this issue because we were trying to save Parquet 
files to S3 and the writer would fail due to empty array being invalid in 
Parquet. When stepping in the debugger I realized that the methods were empty.
   
   After digging a bit i found 2 PRs here that jointly solved the problem, but 
both inactive. I simply created this PR merging both of them to facilitate the 
review/merge process
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row/map/array type [flink]

2024-05-22 Thread via GitHub


JingGe commented on PR #24795:
URL: https://github.com/apache/flink/pull/24795#issuecomment-2125655594

   Thanks @ViktorCosenza for taking care of it. The PR looks overall good. Not 
sure why those `write(ArrayData arrayData, int ordinal)` methods didn't get 
implemented. It should be trivial to do it while building those Writers. Do you 
have any hints about the background info?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] chore: Update pekko version [flink]

2024-05-22 Thread via GitHub


He-Pin commented on PR #24823:
URL: https://github.com/apache/flink/pull/24823#issuecomment-2125584766

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-05-22 Thread via GitHub


afedulov commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1610487523


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##
@@ -98,16 +108,17 @@ static  CsvBulkWriter forPojo(Class 
pojoClass, FSDataOutputStr
 @Override
 public void addElement(T element) throws IOException {
 final R r = converter.convert(element, converterContext);
-csvWriter.writeValue(stream, r);
+csvWriter.writeValue(generator, r);
 }
 
 @Override
 public void flush() throws IOException {
-stream.flush();
+generator.flush();
 }
 
 @Override
 public void finish() throws IOException {
+generator.close();

Review Comment:
   Hmm, I am not sure this is safe, see:
   
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java#L69-L70



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20400][e2e] Migrate test_streaming_sql.sh [flink]

2024-05-22 Thread via GitHub


jeyhunkarimov commented on code in PR #24776:
URL: https://github.com/apache/flink/pull/24776#discussion_r1610462318


##
flink-end-to-end-tests/flink-stream-sql-test/src/test/java/org/apache/flink/sql/tests/StreamSQLTestProgramScalaPlannerITCase.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+
+import org.junit.jupiter.api.AfterAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+class StreamSQLTestProgramScalaPlannerITCase extends 
AbstractStreamSQLTestProgramITCase {
+private static final Logger LOGGER =
+
LoggerFactory.getLogger(StreamSQLTestProgramScalaPlannerITCase.class);
+private static final String DIST_DIR = System.getProperty("distDir");
+
+@Override
+protected FlinkContainers createFlinkContainers() {

Review Comment:
   We need to ensure that `DIST_DIR ` is not null and is not empty?



##
flink-end-to-end-tests/flink-stream-sql-test/src/test/java/org/apache/flink/sql/tests/StreamSQLTestProgramScalaPlannerITCase.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+
+import org.junit.jupiter.api.AfterAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+class StreamSQLTestProgramScalaPlannerITCase extends 
AbstractStreamSQLTestProgramITCase {
+private static final Logger LOGGER =
+
LoggerFactory.getLogger(StreamSQLTestProgramScalaPlannerITCase.class);
+private static final String DIST_DIR = System.getProperty("distDir");
+
+@Override
+protected FlinkContainers createFlinkContainers() {
+// Swap planner jar files in `lib/` and `opt/` folders
+swapPlannerLoaderWithPlannerScala();
+
+return FlinkContainers.builder()
+.withFlinkContainersSettings(
+
FlinkContainersSettings.builder().numTaskManagers(4).build())
+.withTestcontainersSettings(
+
TestcontainersSettings.builder().network(NETWORK).logger(LOGGER).build())
+.build();
+}
+
+@AfterAll
+static void afterAll() {
+swapPlannerScalaWithPlannerLoader();
+}
+
+private void swapPlannerLoaderWithPlannerScala() {

Review Comment:
   Maybe we can invoke this with `@BeforeAll` instead of doing in constructor?



##
flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/c

[jira] [Commented] (FLINK-35417) JobManager and TaskManager support merging and run in a single process

2024-05-22 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848706#comment-17848706
 ] 

Jing Ge commented on FLINK-35417:
-

[~jeyhunkarimov] assigned you, please pay attention to comments above, thanks!

> JobManager and TaskManager support merging and run in a single process
> --
>
> Key: FLINK-35417
> URL: https://issues.apache.org/jira/browse/FLINK-35417
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: melin
>Assignee: Jeyhun Karimov
>Priority: Major
>
> flink is widely used in data integration scenarios, where a single 
> concurrency is not high, and in many cases a single concurrency can run a 
> task. Consider the high availability, application mode, and large number of 
> JobManger nodes that cost a lot of resources. If the Session mode is used, 
> the stability is not high.
> In application mode, JobManager and TaskManager can be run together to 
> achieve reliability and save resources.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35216] Support for RETURNING clause of JSON_QUERY [flink]

2024-05-22 Thread via GitHub


snuyanzin commented on PR #24704:
URL: https://github.com/apache/flink/pull/24704#issuecomment-2125483868

   hm... I tested with the latest version and noticed this strange behavior:
   I use same query
   ```sql
   SELECT JSON_QUERY('{"a":[{"c":null},{"c":"c2"}]}', 'lax $.a[*].c' RETURNING 
ARRAY ERROR ON ERROR);
   ```
   for this the result is 
   ```
[NULL, c2]
   ```
   
   Now the only thing I change is using uppercase `null` in json or just some 
of letters in uppercase like
   ```sql
SELECT JSON_QUERY('{"a":[{"c":Null},{"c":"c2"}]}', 'lax $.a[*].c' RETURNING 
ARRAY ERROR ON ERROR);
   ```
   and now the result is different, just
   ```
   Did I do something wrong here? 
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35417) JobManager and TaskManager support merging and run in a single process

2024-05-22 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-35417:
---

Assignee: Jeyhun Karimov

> JobManager and TaskManager support merging and run in a single process
> --
>
> Key: FLINK-35417
> URL: https://issues.apache.org/jira/browse/FLINK-35417
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: melin
>Assignee: Jeyhun Karimov
>Priority: Major
>
> flink is widely used in data integration scenarios, where a single 
> concurrency is not high, and in many cases a single concurrency can run a 
> task. Consider the high availability, application mode, and large number of 
> JobManger nodes that cost a lot of resources. If the Session mode is used, 
> the stability is not high.
> In application mode, JobManager and TaskManager can be run together to 
> achieve reliability and save resources.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35216] Support for RETURNING clause of JSON_QUERY [flink]

2024-05-22 Thread via GitHub


dawidwys commented on code in PR #24704:
URL: https://github.com/apache/flink/pull/24704#discussion_r1610404628


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/JsonQueryOnErrorEmptyArgumentTypeStrategy.java:
##
@@ -0,0 +1,57 @@
+package org.apache.flink.table.types.inference.strategies;

Review Comment:
   I don’t have my template for new files properly set. I’ll fix that. Any 
other comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-22 Thread via GitHub


HuangZhenQiu commented on PR #24754:
URL: https://github.com/apache/flink/pull/24754#issuecomment-2125397607

   @davidradl 
   The throwable in executors are caught already in Execution environment. If 
there is a better idea to provide extra info for customers, I am glad to adopt. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to [flink]

2024-05-22 Thread via GitHub


davidradl commented on PR #24741:
URL: https://github.com/apache/flink/pull/24741#issuecomment-2125234532

   @reswqa the CI output shows that the config.yaml is not picked up. This was 
moved into the base test calls by the fix. On the face of it it looks like the  
 @BeforeEach is not being driven for each test, so the yaml file is not present 
resulting in the error. Continuing to investigate.
   The error is:
   ...
   Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
Flink config file ```
   '/tmp/junit1139569232718897600/conf882500211154584393/flink-conf.yaml' 
(/tmp/junit1139569232718897600/conf882500211154584393/flink-conf.yaml) does not 
exist.
   May 21 17:58:07  at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:141)
   ```
   


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35216] Support for RETURNING clause of JSON_QUERY [flink]

2024-05-22 Thread via GitHub


snuyanzin commented on code in PR #24704:
URL: https://github.com/apache/flink/pull/24704#discussion_r1610255280


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/JsonQueryOnErrorEmptyArgumentTypeStrategy.java:
##
@@ -0,0 +1,57 @@
+package org.apache.flink.table.types.inference.strategies;

Review Comment:
   It seems this class is without license and is a result of rat-check failure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35417) JobManager and TaskManager support merging and run in a single process

2024-05-22 Thread melin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848676#comment-17848676
 ] 

melin commented on FLINK-35417:
---

The current deployment model has encountered challenges in many projects. I 
just want to send an issue to discuss, perhaps a mailing list is more 
appropriate.
There is currently no ability to implement new deployment patterns. 
[~jeyhunkarimov] You are welcome to drive this issue. Thank you very much!

> JobManager and TaskManager support merging and run in a single process
> --
>
> Key: FLINK-35417
> URL: https://issues.apache.org/jira/browse/FLINK-35417
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: melin
>Priority: Major
>
> flink is widely used in data integration scenarios, where a single 
> concurrency is not high, and in many cases a single concurrency can run a 
> task. Consider the high availability, application mode, and large number of 
> JobManger nodes that cost a lot of resources. If the Session mode is used, 
> the stability is not high.
> In application mode, JobManager and TaskManager can be run together to 
> achieve reliability and save resources.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34935) TIMESTAMP_LTZ type Unsupported when using JdbcCatalog to read from Postgres

2024-05-22 Thread Pietro (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848664#comment-17848664
 ] 

Pietro commented on FLINK-34935:


Related issue: FLINK-35053

> TIMESTAMP_LTZ type Unsupported when using JdbcCatalog to read from Postgres
> ---
>
> Key: FLINK-34935
> URL: https://issues.apache.org/jira/browse/FLINK-34935
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.1
> Environment: flink: 1.17.0
> flink-connector-jdbc: 3.1.0-1.17
> postgres: 14.5
> java: 11
>Reporter: Du Yuzhou
>Priority: Major
>  Labels: flink, jdbc_connector, postgres
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> When I use JdbcCatalog to select from postgres table, it throw Exception:
> {code:java}
> Exception in thread "main" java.lang.UnsupportedOperationException: 
> Unsupported type:TIMESTAMP_LTZ(6)
>     at 
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
>     at 
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
>     at 
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
>     at 
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
>     at 
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.(AbstractJdbcRowConverter.java:68)
>     at 
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.(PostgresRowConverter.java:47)
>     at 
> org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect.getRowConverter(PostgresDialect.java:50)
>     at 
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:182)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
>     at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
>     at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481)
>     at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757)
>     at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366)
>     at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
>     at 
> org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
>     at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
>     at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
>     at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
>     at 
> org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
>     at 
> org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
>     at 
> org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:80)
>     at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
>     at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQu

[jira] [Updated] (FLINK-35423) ARRAY_EXCEPT should support set semantics

2024-05-22 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-35423:

Description: 
After a number of discussions e.g. here [1]
It was decided to follow set semantics for {{ARRAY_EXCEPT}} and 
{{ARRAY_INTERSECT}}.

It is marked as a blocker since {{ARRAY_EXCEPT}} was added in 1.20 only and has 
not been released yet, so the change should be done before 1.20.0 release to 
avoid inconsistencies.

[1] https://github.com/apache/flink/pull/24526

  was:
After a number of discussions e.g. here [1]
It was decided to follow set semantics for {{ARRAY_EXCEPT}} and 
{{ARRAY_INTERSECT}}.

It is marked as a blocker since {{ARRAY_EXCEPT}} was added in 1.20 only and has 
not been released yet, so it the change should be done before release to avoid 
inconsistencies.

[1] https://github.com/apache/flink/pull/24526


> ARRAY_EXCEPT should support set semantics
> -
>
> Key: FLINK-35423
> URL: https://issues.apache.org/jira/browse/FLINK-35423
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Priority: Blocker
>
> After a number of discussions e.g. here [1]
> It was decided to follow set semantics for {{ARRAY_EXCEPT}} and 
> {{ARRAY_INTERSECT}}.
> It is marked as a blocker since {{ARRAY_EXCEPT}} was added in 1.20 only and 
> has not been released yet, so the change should be done before 1.20.0 release 
> to avoid inconsistencies.
> [1] https://github.com/apache/flink/pull/24526



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35423) ARRAY_EXCEPT should support set semantics

2024-05-22 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-35423:
---

 Summary: ARRAY_EXCEPT should support set semantics
 Key: FLINK-35423
 URL: https://issues.apache.org/jira/browse/FLINK-35423
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Sergey Nuyanzin


After a number of discussions e.g. here [1]
It was decided to follow set semantics for {{ARRAY_EXCEPT}} and 
{{ARRAY_INTERSECT}}.

It is marked as a blocker since {{ARRAY_EXCEPT}} was added in 1.20 only and has 
not been released yet, so it the change should be done before release to avoid 
inconsistencies.

[1] https://github.com/apache/flink/pull/24526



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-16105) Translate "User-defined Sources & Sinks" page of "Table API & SQL" into Chinese

2024-05-22 Thread boxes (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848654#comment-17848654
 ] 

boxes edited comment on FLINK-16105 at 5/22/24 2:57 PM:


Hi, Does the documentation for user-defined sources and sinks still need to be 
improved? I hope we can do some document verification work.


was (Author: JIRAUSER305532):
Hi, Does the documentation for user-defined sources and sinks still need to be 
improved? I hope we can do some document verification work。

> Translate "User-defined Sources & Sinks" page of "Table API & SQL" into 
> Chinese 
> 
>
> Key: FLINK-16105
> URL: https://issues.apache.org/jira/browse/FLINK-16105
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sourceSinks.html
> The markdown file is located in {{flink/docs/dev/table/sourceSinks.zh.md}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-16105) Translate "User-defined Sources & Sinks" page of "Table API & SQL" into Chinese

2024-05-22 Thread boxes (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848654#comment-17848654
 ] 

boxes commented on FLINK-16105:
---

Hi, Does the documentation for user-defined sources and sinks still need to be 
improved? I hope we can do some document verification work。

> Translate "User-defined Sources & Sinks" page of "Table API & SQL" into 
> Chinese 
> 
>
> Key: FLINK-16105
> URL: https://issues.apache.org/jira/browse/FLINK-16105
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sourceSinks.html
> The markdown file is located in {{flink/docs/dev/table/sourceSinks.zh.md}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row/map/array type [flink]

2024-05-22 Thread via GitHub


ViktorCosenza commented on PR #24795:
URL: https://github.com/apache/flink/pull/24795#issuecomment-2124999154

   > Thanks! The PR looks good to me. Hi @JingsongLi, please also take a look. 
I think we can port this to fix 
[apache/paimon#1730](https://github.com/apache/paimon/issues/1730)
   
   Hey @xccui, thanks for the quick review! Just checking in to keep this PR 
alive.
   Is there anything I can do to help get this merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >