[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2022-07-12 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566180#comment-17566180
 ] 

Danny Cranmer commented on FLINK-24229:
---

[~prasaanth07] Flink connectors are being moved to new repositories with 
dedicated release lifecycles and independent versioning strategies. I plan on 
proposing to target Flink 1.15+ with this connector, so once released you can 
start using it right away on older Flink versions.

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Yuri Gusev
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] trushev commented on pull request #20261: [FLINK-28530] Improvement of extraction of conditions that can be pushed into join inputs

2022-07-12 Thread GitBox


trushev commented on PR #20261:
URL: https://github.com/apache/flink/pull/20261#issuecomment-1182786958

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] trushev commented on pull request #20261: [FLINK-28530] Improvement of extraction of conditions that can be pushed into join inputs

2022-07-12 Thread GitBox


trushev commented on PR #20261:
URL: https://github.com/apache/flink/pull/20261#issuecomment-1182786906

   `CassandraConnectorITCase` failed on CI but it shouldn't have been affected 
by this changes and it works fine locally. Triggering pipeline again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #311: [FLINK-28479] Add metrics for resource lifecycle state transitions

2022-07-12 Thread GitBox


morhidi commented on code in PR #311:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/311#discussion_r919663834


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java:
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.CREATED;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.DEPLOYED;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLED_BACK;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLING_BACK;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.STABLE;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.SUSPENDED;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.UPGRADING;
+
+/**
+ * Utility for tracking resource lifecycle metrics globally and per namespace.
+ *
+ * @param  Flink resource type.
+ */
+public class LifecycleMetrics> {
+
+private static final String TRANSITION_FIRST_DEPLOYMENT = 
"FirstDeployment";
+private static final String TRANSITION_RESUME = "Resume";
+private static final String TRANSITION_UPGRADE = "Upgrade";
+private static final String TRANSITION_SUSPEND = "Suspend";
+private static final String TRANSITION_SUBMISSION = "Submission";
+private static final String TRANSITION_STABILIZATION = "Stabilization";
+private static final String TRANSITION_ROLLBACK = "Rollback";
+
+public static final List TRACKED_TRANSITIONS = 
getTrackedTransitions();
+
+private final Map, ResourceLifecycleMetricTracker> 
lifecycleTrackers =
+new ConcurrentHashMap<>();
+private final Set namespaces = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+
+private final FlinkConfigManager configManager;
+private final Clock clock;
+private final KubernetesOperatorMetricGroup operatorMetricGroup;
+
+private Map>> 
transitionMetrics;
+private Function metricGroupFunction;
+
+public LifecycleMetrics(
+FlinkConfigManager configManager,
+Clock clock,
+KubernetesOperatorMetricGroup operatorMetricGroup) {
+this.configManager = configManager;
+this.clock = clock;
+this.operatorMetricGroup = operatorMetricGroup;
+}
+
+public void onUpdate(CR cr) {
+
getLifecycleMetricTracker(cr).onUpdate(cr.getStatus().getLifecycleState(), 
clock.instant());
+}
+
+public void onRemove(CR cr) {
+lifecycleTrackers.remove(
+Tuple2.of(cr.getMetadata().getNamespace(), 
cr.getMetadata().getName()));
+}
+
+private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) {
+init(cr);
+createNamespaceStateCountIfMissing(cr.getMetadata().getNamespace());
+return lifecycleTrackers.computeIfAbsent(
+Tuple2.of(cr.getMetadata().getNamespace(), 
cr.getMetadata().getName()),
+k -> {
+var initialState = cr.getStatus().getLifecycleState();
+var time =
+initialState == 

[GitHub] [flink] wanglijie95 commented on a diff in pull request #20222: [FLINK-28137] Introduce SpeculativeScheduler

2022-07-12 Thread GitBox


wanglijie95 commented on code in PR #20222:
URL: https://github.com/apache/flink/pull/20222#discussion_r919661044


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -385,7 +386,8 @@ private SchedulerNG createScheduler(
 initializationTimestamp,
 getMainThreadExecutor(),
 fatalErrorHandler,
-jobStatusListener);
+jobStatusListener,
+new NoOpBlocklistHandler());

Review Comment:
   Maybe abstract a `BlocklistOperations` and only expose `addNewBlockedNodes` 
to scheduler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dinggege1024 commented on pull request #20255: [FLINK-28486][docs-zh] Flink FileSystem SQL Connector Doc is not right

2022-07-12 Thread GitBox


dinggege1024 commented on PR #20255:
URL: https://github.com/apache/flink/pull/20255#issuecomment-1182777301

   Hi @JingsongLi review this issue,plz.  I have updated in this issue by your 
suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #20222: [FLINK-28137] Introduce SpeculativeScheduler

2022-07-12 Thread GitBox


wanglijie95 commented on code in PR #20222:
URL: https://github.com/apache/flink/pull/20222#discussion_r919645557


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##
@@ -324,13 +332,24 @@ private void restartTasksWithDelay(final 
FailureHandlingResult failureHandlingRe
 .values());
 final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
 
+if (globalRecovery) {
+log.info(
+"{} tasks will be restarted to recover from a global 
failure.",
+verticesToRestart.size());
+} else {
+
checkArgument(failureHandlingResult.getFailedExecution().isPresent());
+log.info(
+"{} tasks will be restarted to recover the failed task 
{}.",

Review Comment:
   The printed task id was changed from `ExecutionVertexID` to 
`ExecutionAttemptID`. But I think it is more reasonable than before.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##
@@ -220,46 +217,57 @@ protected void startSchedulingInternal() {
 }
 
 @Override
-protected void updateTaskExecutionStateInternal(
-final ExecutionVertexID executionVertexId,
-final TaskExecutionStateTransition taskExecutionState) {
+protected void onTaskExecutionStateUpdate(final Execution execution) {
+switch (execution.getState()) {
+case FINISHED:
+onTaskFinished(execution);
+break;
+case FAILED:
+onTaskFailed(execution);
+break;
+default:
+throw new IllegalArgumentException(
+String.format(
+"State %s should not be notified to 
DefaultScheduler.",
+execution.getState()));
+}
+}
 
+protected void onTaskFinished(final Execution execution) {
+checkState(execution.getState() == ExecutionState.FINISHED);
+
+final ExecutionVertexID executionVertexId = 
execution.getVertex().getID();
 // once a task finishes, it will release the assigned allocation/slot 
and no longer
 // needs it. Therefore, it should stop reserving the slot so that 
other tasks are
 // possible to use the slot. Ideally, the `stopReserveAllocation` 
should happen
 // along with the release slot process. However, that process is 
hidden in the depth
 // of the ExecutionGraph, so we currently do it in DefaultScheduler 
after that process
 // is done.
-if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) 
{
-stopReserveAllocation(executionVertexId);
-}
+stopReserveAllocation(executionVertexId);
 
-schedulingStrategy.onExecutionStateChange(
-executionVertexId, taskExecutionState.getExecutionState());
-maybeHandleTaskFailure(taskExecutionState, 
getCurrentExecutionOfVertex(executionVertexId));
+schedulingStrategy.onExecutionStateChange(executionVertexId, 
ExecutionState.FINISHED);

Review Comment:
   The `SchedulingStrategy#onExecutionStateChange` is only called when a task 
finishes currently, how about change it to `onExecutionFinish` ? Or call it for 
all state changes?
   
   Otherwise, it may confuse subsequent developers.



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java:
##
@@ -85,6 +85,10 @@ public Collection getCurrentExecutions() {
 return Collections.unmodifiableCollection(currentExecutions.values());
 }
 
+public Execution getCurrentExecutionOrThrow(final ExecutionAttemptID 
attemptId) {

Review Comment:
   never be used



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import 

[jira] [Commented] (FLINK-28531) Shutdown cluster after history server archive finished

2022-07-12 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566162#comment-17566162
 ] 

Xintong Song commented on FLINK-28531:
--

[~aitozi],
Which version of Flink did you use when encountered this problem? Could it be 
already fixed by FLINK-24491?

> Shutdown cluster after history server archive finished
> --
>
> Key: FLINK-28531
> URL: https://issues.apache.org/jira/browse/FLINK-28531
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Priority: Major
>
> I met a problem that the job cluster may be shutdown with history server 
> archive file upload not finished.
> After some research, It's may be caused by two reason.
> First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to 
> complete 
> Second, the deregisterApp in the 
> {{KubernetesResourceManagerDriver#deregisterApplication}} will directly 
> remove the deployment. So in the shutdown flow in ClusterEntrypoint, it will 
> first trigger the delete deployment, it will cause the master pod deleted 
> with some operation/future can not finished



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1

2022-07-12 Thread GitBox


morhidi commented on code in PR #316:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919652468


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java:
##
@@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
 String flink14Response =
 "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
 String flink15Response =
-"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
+"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

Review Comment:
   see my comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1

2022-07-12 Thread GitBox


morhidi commented on code in PR #316:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919652210


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java:
##
@@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
 String flink14Response =
 "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
 String flink15Response =
-"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
+"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

Review Comment:
   We could potentially remove this test, since the e2e tests would cover the 
functionality. I guess I'll just remove the flink15Response check since the 
schema didn't change between 1.14 and 1.15. It's good to have a test showing 
the schema changes tho.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1

2022-07-12 Thread GitBox


morhidi commented on code in PR #316:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919652210


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java:
##
@@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
 String flink14Response =
 "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
 String flink15Response =
-"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
+"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

Review Comment:
   We could potentially remove this test, e2e test would cover the 
functionality. I guess I'll just remove the flink15Response check since the 
schema didn't change between 1.14 and 1.15. It's good to have a test showing 
the schema changes tho.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #210: [FLINK-28511] Improve Spark dependencies and document

2022-07-12 Thread GitBox


LadyForest commented on code in PR #210:
URL: https://github.com/apache/flink-table-store/pull/210#discussion_r919649696


##
docs/content/docs/engines/spark.md:
##
@@ -41,17 +41,33 @@ Download [flink-table-store-spark-{{< version 
>}}.jar](https://repo.maven.apache
 You are using an unreleased version of Table Store, you need to manually 
[Build Spark Bundled Jar]({{< ref "docs/engines/build" >}}) from the source 
code.
 {{< /unstable >}}
 
-Copy Table Store Spark bundle jar to `spark/jars`.
+Use `--jars` in spark-sql:
+```bash
+spark-sql ... --jars flink-table-store-spark-{{< version >}}.jar
+```
 
-## Table Store Catalog
+You can also copy `flink-table-store-spark-{{< version >}}.jar` to 
`spark/jars` in your Spark installation.
+
+## Catalog
 
 The following command registers the Table Store's Spark catalog with the name 
`table_store`:
 
 ```bash
-spark-sql --conf 
spark.sql.catalog.table_store=org.apache.flink.table.store.spark.SparkCatalog \
+spark-sql ... \
+--conf 
spark.sql.catalog.table_store=org.apache.flink.table.store.spark.SparkCatalog \
 --conf spark.sql.catalog.table_store.warehouse=file:/tmp/warehouse
 ```
 
+If you are using the Hive Metastore, you will need to add some configuration:

Review Comment:
   ```suggestion
   Some extra configurations are needed if your Spark application uses the Hive 
Metastore to manage metadata.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuzhuang2017 commented on pull request #20253: [hotfix][docs-zh] Add missing the working_directory.md file to the standalone part.

2022-07-12 Thread GitBox


liuzhuang2017 commented on PR #20253:
URL: https://github.com/apache/flink/pull/20253#issuecomment-1182767837

   @MartijnVisser , Sorry to bother you again, can you help me review this pr? 
Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2022-07-12 Thread Prasaanth (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566154#comment-17566154
 ] 

Prasaanth commented on FLINK-24229:
---

Hi all, excited to see Flink supporting a DynamoDB sink natively. Is 1.17 the 
Flink version where we can expect this? Looks like Flink 1.16 has a release 
freeze on the 25th of July this year. 

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Yuri Gusev
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #210: [FLINK-28511] Improve Spark dependencies and document

2022-07-12 Thread GitBox


LadyForest commented on code in PR #210:
URL: https://github.com/apache/flink-table-store/pull/210#discussion_r919642060


##
docs/content/docs/engines/spark.md:
##
@@ -41,17 +41,33 @@ Download [flink-table-store-spark-{{< version 
>}}.jar](https://repo.maven.apache
 You are using an unreleased version of Table Store, you need to manually 
[Build Spark Bundled Jar]({{< ref "docs/engines/build" >}}) from the source 
code.
 {{< /unstable >}}
 
-Copy Table Store Spark bundle jar to `spark/jars`.
+Use `--jars` in spark-sql:
+```bash
+spark-sql ... --jars flink-table-store-spark-{{< version >}}.jar
+```
 
-## Table Store Catalog
+You can also copy `flink-table-store-spark-{{< version >}}.jar` to 
`spark/jars` in your Spark installation.

Review Comment:
   ```suggestion
   Alternatively, you can copy `flink-table-store-spark-{{< version >}}.jar` 
under `spark/jars` in your Spark installation.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #20008: [FLINK-27990][table-planner] Parquet format supports reporting statis…

2022-07-12 Thread GitBox


swuferhong commented on code in PR #20008:
URL: https://github.com/apache/flink/pull/20008#discussion_r919636056


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java:
##
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+
+/** The base class for statistics report testing. */
+public abstract class StatisticsReportTestBase extends TestLogger {
+
+protected TableEnvironment tEnv;
+protected File folder;
+
+@BeforeEach
+public void setup(@TempDir File file) throws Exception {
+folder = file;
+tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
+}
+
+@AfterEach
+public void after() {
+TestValuesTableFactory.clearAllData();
+}
+
+protected void createFileSystemSource(String format) {
+// now format can be parquet, orc and csv.
+String[] properties;
+if (format.equals("orc")) {
+properties = formatOrcProperties();
+} else if (format.equals("parquet")) {
+properties = formatParquetProperties();
+} else {
+// default is Csv format
+properties = formatCsvProperties();
+}
+
+String ddl1 =
+String.format(
+"CREATE TABLE sourceTable (\n"
++ "%s"
++ ") with (\n"
++ " 'connector' = 'filesystem',"
++ " 'path' = '%s',"
++ "%s )",
+String.join(",\n", 
ddlTypesMapToStringList(ddlTypesMap())),
+folder,
+String.join(",\n", properties));
+tEnv.executeSql(ddl1);
+}
+
+private String[] formatParquetProperties() {
+List ret = new ArrayList<>();
+ret.add("'format'='parquet'");
+ret.add("'parquet.utc-timezone'='true'");
+ret.add("'parquet.compression'='gzip'");
+return ret.toArray(new String[0]);
+}
+
+private String[] formatOrcProperties() {
+List ret = new ArrayList<>();
+ret.add("'format'='orc'");
+ret.add("'orc.compress'='snappy'");
+return ret.toArray(new String[0]);
+}
+
+private String[] formatCsvProperties() {
+List ret = new ArrayList<>();
+ret.add("'format' = 'csv'");
+return ret.toArray(new String[0]);
+}
+
+protected Map ddlTypesMap() {
+Map ddlTypesMap = new LinkedHashMap<>();
+ddlTypesMap.put("boolean", "a");
+ddlTypesMap.put("tinyint", "b");
+ddlTypesMap.put("smallint", "c");
+ddlTypesMap.put("int", "d");
+ddlTypesMap.put("bigint", "e");
+ddlTypesMap.put("float", "f");
+ddlTypesMap.put("double", "g");
+ddlTypesMap.put("string", "h");
+

[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

2022-07-12 Thread GitBox


gaoyunhaii commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r919606689


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusHook.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+
+/**
+ * Hooks provided by users on job status changing. Triggered at the 
initial(CREATED) and final
+ * state(FINISHED/CANCELED/FAILED) of the job.
+ *
+ * Usage examples: 
+ * StreamGraph streamGraph = env.getStreamGraph();
+ * streamGraph.registerJobStatusHook(myJobStatusHook);
+ * streamGraph.setJobName("my_flink");
+ * env.execute(streamGraph);
+ * 
+ */
+@Internal
+public interface JobStatusHook extends Serializable {
+
+/** When Job become {@link JobStatus#CREATED} status, it would only be 
called one time. */

Review Comment:
   `become` -> `becomes` 
   
   nit: import JobStatus to avoid warnings



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##
@@ -1618,6 +1621,141 @@ public void 
testCheckpointCleanerIsClosedAfterCheckpointServices() throws Except
 }
 }
 
+@Test
+public void testJobStatusHookWithJobFailed() throws Exception {

Review Comment:
   The three test methods seem quite similar, could we extract a common help 
method?



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java:
##
@@ -628,4 +632,12 @@ public void writeUserArtifactEntriesToConfiguration() {
 userArtifact.getKey(), userArtifact.getValue(), 
jobConfiguration);
 }
 }
+
+public void setJobStatusHooks(List hooks) {
+this.jobStatusHooks = hooks;

Review Comment:
   Might also `checkNotNull` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #20239: [FLINK-27703][core] Extend timeout and add logs for FileChannelManagerImplTest

2022-07-12 Thread GitBox


gaoyunhaii commented on PR #20239:
URL: https://github.com/apache/flink/pull/20239#issuecomment-1182736598

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1

2022-07-12 Thread GitBox


morhidi commented on code in PR #316:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919620212


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java:
##
@@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
 String flink14Response =
 "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
 String flink15Response =
-"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
+"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

Review Comment:
   good point, I'll have a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] DavidLiu001 commented on pull request #20143: [FLINK-25735][content.zh] Chinese Translation - Add documentation for…

2022-07-12 Thread GitBox


DavidLiu001 commented on PR #20143:
URL: https://github.com/apache/flink/pull/20143#issuecomment-1182728130

   @MartijnVisser  The comment issues have been resolved, if there is no 
further comment, please help merge this PR.  
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #211: [FLINK-28465] commit.force-compact should always be true under batch mode

2022-07-12 Thread GitBox


SteNicholas commented on code in PR #211:
URL: https://github.com/apache/flink-table-store/pull/211#discussion_r919616462


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##
@@ -132,9 +133,14 @@ private static void 
validateFileStoreContinuous(Configuration options) {
 }
 
 static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context 
context) {
-FileStoreTable table =
-FileStoreTableFactory.create(
-
Configuration.fromMap(context.getCatalogTable().getOptions()));
+boolean batch =
+context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+== RuntimeExecutionMode.BATCH;
+Map options = new 
HashMap<>(context.getCatalogTable().getOptions());
+if (batch) {
+options.put(CoreOptions.COMMIT_FORCE_COMPACT.key(), 
String.valueOf(true));

Review Comment:
   Does the config option `COMMIT_FORCE_COMPACT` need to explain in the 
description that the default value is true in BATCH mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort

2022-07-12 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566131#comment-17566131
 ] 

Jinzhong Li commented on FLINK-28515:
-

[~roman] [~yunta] Could you please take a look at this ticket?

> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> 
>
> Key: FLINK-28515
> URL: https://issues.apache.org/jira/browse/FLINK-28515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, image.png
>
>
> In my case,  i found that some files in local recovery directory hasn't be 
> clean up properly after checkpoint abort(as shown in the attached picture).
> By analyzing flink log, I found that when stateBackend completes the local 
> snapshot but the task has not completed the whole snapshot, 
> then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
> files in the local directory directory may not be cleaned up properly.
> I think the reason for local snapshot file residual is:
> (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
> the comleted localSnapshot info can be registered into 
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
> completed the whole snapshot. 
> ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
> (2) If stateBackend completes the local snapshot but the task has not 
> completed the entire snapshot, when checkpoint-aborting is triggered, the 
> TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
> ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])
> To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, 
> we can try to delete the corresponding localRecovery directory, even if the 
> checkpoint is not unregistered into TaskLocalStateStoreImpl.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #211: [FLINK-28465] commit.force-compact should always be true under batch mode

2022-07-12 Thread GitBox


SteNicholas commented on code in PR #211:
URL: https://github.com/apache/flink-table-store/pull/211#discussion_r919615647


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##
@@ -132,9 +133,14 @@ private static void 
validateFileStoreContinuous(Configuration options) {
 }
 
 static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context 
context) {
-FileStoreTable table =
-FileStoreTableFactory.create(
-
Configuration.fromMap(context.getCatalogTable().getOptions()));
+boolean batch =
+context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+== RuntimeExecutionMode.BATCH;
+Map options = new 
HashMap<>(context.getCatalogTable().getOptions());

Review Comment:
   ```suggestion
   Configuration configuration = 
Configuration.fromMap(context.getCatalogTable().getOptions());
   configuration.setBoolean(CoreOptions.COMMIT_FORCE_COMPACT, batch);
   FileStoreTable table = FileStoreTableFactory.create(configuration);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort

2022-07-12 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566127#comment-17566127
 ] 

Yanfei Lei edited comment on FLINK-28515 at 7/13/22 3:20 AM:
-

I think this is reasonable. Although TM will delete other checkpoints [when 
recovering|#L199],] , if JM re-allocate the task, the files in local recovery 
directory of aborted checkpoint would not be clean up properly, it's right to 
delete early.


was (Author: yanfei lei):
I think this is reasonable. Although TM will delete other checkpoints [when 
recovering|[https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java#L199],]
 if JM re-allocate the task, the files in local recovery directory of aborted 
checkpoint would not be clean up properly, it's right to delete early.

> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> 
>
> Key: FLINK-28515
> URL: https://issues.apache.org/jira/browse/FLINK-28515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, image.png
>
>
> In my case,  i found that some files in local recovery directory hasn't be 
> clean up properly after checkpoint abort(as shown in the attached picture).
> By analyzing flink log, I found that when stateBackend completes the local 
> snapshot but the task has not completed the whole snapshot, 
> then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
> files in the local directory directory may not be cleaned up properly.
> I think the reason for local snapshot file residual is:
> (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
> the comleted localSnapshot info can be registered into 
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
> completed the whole snapshot. 
> ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
> (2) If stateBackend completes the local snapshot but the task has not 
> completed the entire snapshot, when checkpoint-aborting is triggered, the 
> TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
> ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])
> To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, 
> we can try to delete the corresponding localRecovery directory, even if the 
> checkpoint is not unregistered into TaskLocalStateStoreImpl.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort

2022-07-12 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566127#comment-17566127
 ] 

Yanfei Lei edited comment on FLINK-28515 at 7/13/22 3:20 AM:
-

I think this is reasonable. Although TM will delete other checkpoints [when 
recovering|#L199],  if JM re-allocate the task, the files in local recovery 
directory of aborted checkpoint would not be clean up properly, it's right to 
delete early.


was (Author: yanfei lei):
I think this is reasonable. Although TM will delete other checkpoints [when 
recovering|#L199],] , if JM re-allocate the task, the files in local recovery 
directory of aborted checkpoint would not be clean up properly, it's right to 
delete early.

> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> 
>
> Key: FLINK-28515
> URL: https://issues.apache.org/jira/browse/FLINK-28515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, image.png
>
>
> In my case,  i found that some files in local recovery directory hasn't be 
> clean up properly after checkpoint abort(as shown in the attached picture).
> By analyzing flink log, I found that when stateBackend completes the local 
> snapshot but the task has not completed the whole snapshot, 
> then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
> files in the local directory directory may not be cleaned up properly.
> I think the reason for local snapshot file residual is:
> (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
> the comleted localSnapshot info can be registered into 
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
> completed the whole snapshot. 
> ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
> (2) If stateBackend completes the local snapshot but the task has not 
> completed the entire snapshot, when checkpoint-aborting is triggered, the 
> TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
> ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])
> To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, 
> we can try to delete the corresponding localRecovery directory, even if the 
> checkpoint is not unregistered into TaskLocalStateStoreImpl.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Aitozi commented on pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

2022-07-12 Thread GitBox


Aitozi commented on PR #20256:
URL: https://github.com/apache/flink/pull/20256#issuecomment-1182719574

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort

2022-07-12 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566127#comment-17566127
 ] 

Yanfei Lei commented on FLINK-28515:


I think this is reasonable. Although TM will delete other checkpoints [when 
recovering|[https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java#L199],]
 if JM re-allocate the task, the files in local recovery directory of aborted 
checkpoint would not be clean up properly, it's right to delete early.

> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> 
>
> Key: FLINK-28515
> URL: https://issues.apache.org/jira/browse/FLINK-28515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, image.png
>
>
> In my case,  i found that some files in local recovery directory hasn't be 
> clean up properly after checkpoint abort(as shown in the attached picture).
> By analyzing flink log, I found that when stateBackend completes the local 
> snapshot but the task has not completed the whole snapshot, 
> then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
> files in the local directory directory may not be cleaned up properly.
> I think the reason for local snapshot file residual is:
> (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
> the comleted localSnapshot info can be registered into 
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
> completed the whole snapshot. 
> ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
> (2) If stateBackend completes the local snapshot but the task has not 
> completed the entire snapshot, when checkpoint-aborting is triggered, the 
> TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
> ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])
> To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, 
> we can try to delete the corresponding localRecovery directory, even if the 
> checkpoint is not unregistered into TaskLocalStateStoreImpl.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] leozhangsr commented on pull request #20234: [FLINK-28475] [Connector/kafka] Stopping offset can be 0

2022-07-12 Thread GitBox


leozhangsr commented on PR #20234:
URL: https://github.com/apache/flink/pull/20234#issuecomment-1182716948

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28531) Shutdown cluster after history server archive finished

2022-07-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566124#comment-17566124
 ] 

Aitozi commented on FLINK-28531:


cc [~wangyang0918] [~xtsong]

> Shutdown cluster after history server archive finished
> --
>
> Key: FLINK-28531
> URL: https://issues.apache.org/jira/browse/FLINK-28531
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Priority: Major
>
> I met a problem that the job cluster may be shutdown with history server 
> archive file upload not finished.
> After some research, It's may be caused by two reason.
> First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to 
> complete 
> Second, the deregisterApp in the 
> {{KubernetesResourceManagerDriver#deregisterApplication}} will directly 
> remove the deployment. So in the shutdown flow in ClusterEntrypoint, it will 
> first trigger the delete deployment, it will cause the master pod deleted 
> with some operation/future can not finished



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28122) Translate "Overview " and "Project Configuration" in "User-defined Sources & Sinks" page

2022-07-12 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-28122:
---

Assignee: hunter

> Translate "Overview " and "Project Configuration" in "User-defined Sources & 
> Sinks" page 
> -
>
> Key: FLINK-28122
> URL: https://issues.apache.org/jira/browse/FLINK-28122
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Reporter: Chengkai Yang
>Assignee: hunter
>Priority: Minor
>
> The links are
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#overview
> and 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#project-configuration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28531) Shutdown cluster after history server archive finished

2022-07-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566123#comment-17566123
 ] 

Aitozi commented on FLINK-28531:


I propose to fix this in two way:

First, in the Dispatcher, we also add the archive future to the 
jobTerminationFuture to let it be finished when shutdown.
Second, avoid to delete the master pod in the deregisterApp, and delete the 
cluster until the ClusterEntrypoint terminationFuture have finished.



> Shutdown cluster after history server archive finished
> --
>
> Key: FLINK-28531
> URL: https://issues.apache.org/jira/browse/FLINK-28531
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Priority: Major
>
> I met a problem that the job cluster may be shutdown with history server 
> archive file upload not finished.
> After some research, It's may be caused by two reason.
> First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to 
> complete 
> Second, the deregisterApp in the 
> {{KubernetesResourceManagerDriver#deregisterApplication}} will directly 
> remove the deployment. So in the shutdown flow in ClusterEntrypoint, it will 
> first trigger the delete deployment, it will cause the master pod deleted 
> with some operation/future can not finished



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before comple

2022-07-12 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566122#comment-17566122
 ] 

Yanfei Lei commented on FLINK-28529:


Sure, look like it's caused by checkpoint expired, I will check it.

> ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode
>  failed with CheckpointException: Checkpoint expired before completing
> ---
>
> Key: FLINK-28529
> URL: https://issues.apache.org/jira/browse/FLINK-28529
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] 
> ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode
>   Time elapsed: 617.048 s  <<< ERROR!
> 2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
> before completing.
> 2022-07-12T04:30:49.9913880Z Jul 12 04:30:49  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2022-07-12T04:30:49.9914606Z Jul 12 04:30:49  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2022-07-12T04:30:49.9915572Z Jul 12 04:30:49  at 
> org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125)
> 2022-07-12T04:30:49.9916483Z Jul 12 04:30:49  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-07-12T04:30:49.9917377Z Jul 12 04:30:49  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-07-12T04:30:49.9918121Z Jul 12 04:30:49  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-07-12T04:30:49.9918788Z Jul 12 04:30:49  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-07-12T04:30:49.9919456Z Jul 12 04:30:49  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-07-12T04:30:49.9920193Z Jul 12 04:30:49  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-07-12T04:30:49.9920923Z Jul 12 04:30:49  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-07-12T04:30:49.9921630Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-07-12T04:30:49.9922326Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-07-12T04:30:49.9923023Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-07-12T04:30:49.9923708Z Jul 12 04:30:49  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-07-12T04:30:49.9924449Z Jul 12 04:30:49  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-07-12T04:30:49.9925124Z Jul 12 04:30:49  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-07-12T04:30:49.9925912Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-07-12T04:30:49.9926742Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-07-12T04:30:49.9928142Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-07-12T04:30:49.9928715Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-07-12T04:30:49.9929311Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-07-12T04:30:49.9929863Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-07-12T04:30:49.9930376Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-07-12T04:30:49.9930911Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-07-12T04:30:49.9931441Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-07-12T04:30:49.9931975Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-07-12T04:30:49.9932493Z Jul 12 04:30:49  at 
> 

[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1

2022-07-12 Thread GitBox


bgeng777 commented on code in PR #316:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919604652


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java:
##
@@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
 String flink14Response =
 "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
 String flink15Response =
-"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
+"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

Review Comment:
   Maybe the fink-revision number/date should be changed to 1.15.1's as well. 
The revision number should be 
[f494be6](https://github.com/apache/flink/commit/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47).
 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-28531) Shutdown cluster after history server archive finished

2022-07-12 Thread Aitozi (Jira)
Aitozi created FLINK-28531:
--

 Summary: Shutdown cluster after history server archive finished
 Key: FLINK-28531
 URL: https://issues.apache.org/jira/browse/FLINK-28531
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Aitozi


I met a problem that the job cluster may be shutdown with history server 
archive file upload not finished.

After some research, It's may be caused by two reason.

First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to 
complete 
Second, the deregisterApp in the 
{{KubernetesResourceManagerDriver#deregisterApplication}} will directly remove 
the deployment. So in the shutdown flow in ClusterEntrypoint, it will first 
trigger the delete deployment, it will cause the master pod deleted with some 
operation/future can not finished



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1

2022-07-12 Thread GitBox


bgeng777 commented on code in PR #316:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919604652


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java:
##
@@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
 String flink14Response =
 "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
 String flink15Response =
-"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
+"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

Review Comment:
   Maybe the fink-revision number/date should be changed 1.15.1's as well. The 
revision number should be 
[f494be6](https://github.com/apache/flink/commit/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47).
 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #316: [FLINK-28517] Bump Flink version to 1.15.1

2022-07-12 Thread GitBox


wangyang0918 commented on code in PR #316:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/316#discussion_r919604132


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java:
##
@@ -293,7 +293,7 @@ public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
 String flink14Response =
 "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
 String flink15Response =
-"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
+"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.1\",\"flink-revision\":\"3a4c113
 @ 
2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

Review Comment:
   Do we really need to bump the version in the tests every time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #20261: [FLINK-28530] Improvement of extraction of conditions that can be pushed into join inputs

2022-07-12 Thread GitBox


flinkbot commented on PR #20261:
URL: https://github.com/apache/flink/pull/20261#issuecomment-1182705940

   
   ## CI report:
   
   * 221a9ecd46069d80583c766bf39f095714801f88 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Tartarus0zm commented on pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax

2022-07-12 Thread GitBox


Tartarus0zm commented on PR #20252:
URL: https://github.com/apache/flink/pull/20252#issuecomment-1182705141

   @wuchong  @godfreyhe  @lsyldliu  @luoyuxia @beyond1920   please take a look, 
if you have time, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28122) Translate "Overview " and "Project Configuration" in "User-defined Sources & Sinks" page

2022-07-12 Thread hunter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566118#comment-17566118
 ] 

hunter commented on FLINK-28122:


[~jark] Hi,Jark,I am very interested to do it, I can do this work if needed.

> Translate "Overview " and "Project Configuration" in "User-defined Sources & 
> Sinks" page 
> -
>
> Key: FLINK-28122
> URL: https://issues.apache.org/jira/browse/FLINK-28122
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Reporter: Chengkai Yang
>Priority: Minor
>
> The links are
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#overview
> and 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#project-configuration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] godfreyhe commented on a diff in pull request #20246: [FLINK-28074][table-planner] show statistics details for DESCRIBE EXT…

2022-07-12 Thread GitBox


godfreyhe commented on code in PR #20246:
URL: https://github.com/apache/flink/pull/20246#discussion_r919600208


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##
@@ -397,7 +398,7 @@ class TableEnvironmentTest {
   () => tableEnv.executeSql("SELECT c FROM my_view /*+ 
OPTIONS('is-bounded' = 'true') */"))
   .hasMessageContaining("View 
'`default_catalog`.`default_database`.`my_view`' " +
 "cannot be enriched with new options. Hints can only be applied to 
tables.")
-  .isInstanceOf(classOf[ValidationException])
+  .isInstanceOf[ValidationException]

Review Comment:
   `isInstanceOf` is the method of `AbstractAssert` instead of `scala`, please 
revert the change



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/DescExtendedStyle.java:
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.print;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+/**
+ * Print desc extended result.
+ *
+ * For example: (printRowKind is false)
+ *
+ * 
+ *  col_name data_type hasNull
+ *  firstSTRINGTRUE
+ *  second   INT   TRUE
+ *  thirdSTRINGTRUE
+ *
+ *  # Detailed Table Information
+ *  Table Statistics:
+ *rowCount:10
+ *fileCount:   2
+ *rawDataSize: 100
+ *totalSize:   10
+ * 
+ *
+ * From this example, we can see that this print style can print different 
contents in different
+ * parts and calculate the column widths for different parts respectively.
+ */
+@Internal
+public final class DescExtendedStyle implements PrintStyle {

Review Comment:
   I would like introduce a combined `PrintStyle`, which contains multiple 
`PrintStyle`s, each of which can have its own style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28530) Improvement of extraction of conditions that can be pushed into join inputs

2022-07-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28530:
---
Labels: pull-request-available  (was: )

> Improvement of extraction of conditions that can be pushed into join inputs
> ---
>
> Key: FLINK-28530
> URL: https://issues.apache.org/jira/browse/FLINK-28530
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Alexander Trushev
>Priority: Major
>  Labels: pull-request-available
>
> Conditions extraction in batch mode was introduced here FLINK-12509 and in 
> stream mode here FLINK-24139
> h2. Proposal
> This ticket is aimed at replacing current extraction algorithm with new one 
> which covers more complex case with deep nested predicate:
> for all n > 0
> a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) 
> => (a0 or a1 or ... or an)
> *Example.* For n = 3 Flink does not extract (a0 or a1 or a2 or a3):
> {code:java}
> FlinkSQL> explain select * from A join B on (a0=0 and b0=0) or a1=0) and 
> b1=0) or a2=0) and b2=0) or a3=0;
> == Optimized Physical Plan ==
> Join(joinType=[InnerJoin], where=[OR(AND(OR(AND(OR(AND(=(a0, 0), =(b0, 0)), 
> =(a1, 0)), =(b1, 0)), =(a2, 0)), =(b2, 0)), =(a3, 0))], select=[a0, a1, a2, 
> a3, a4, b0, b1, b2, b3, b4], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[single])
> :  +- TableSourceScan(table=[[default_catalog, default_database, A]], 
> fields=[a0, a1, a2, a3, a4])
> +- Exchange(distribution=[single])
>+- TableSourceScan(table=[[default_catalog, default_database, B]], 
> fields=[b0, b1, b2, b3, b4])
> {code}
> while PostgreSQL does:
> {code:java}
> postgres=# explain select * from A join B on ((a0=0 and b0=0) or a1=0) 
> and b1=0) or a2=0) and b2=0) or a3=0);
>   QUERY PLAN
> --
>  Nested Loop  (cost=0.00..1805.09 rows=14632 width=40)
>Join Filter: (((a.a0 = 0) AND (b.b0 = 0)) OR (a.a1 = 0)) AND (b.b1 = 
> 0)) OR (a.a2 = 0)) AND (b.b2 = 0)) OR (a.a3 = 0))
>->  Seq Scan on b  (cost=0.00..27.00 rows=1700 width=20)
>->  Materialize  (cost=0.00..44.17 rows=34 width=20)
>  ->  Seq Scan on a  (cost=0.00..44.00 rows=34 width=20)
>Filter: ((a0 = 0) OR (a1 = 0) OR (a2 = 0) OR (a3 = 0))
> {code}
> h2. Details
> Pseudocode of new algorithm:
> f – predicate
> rel – table
> var(rel) – columns
> {code:java}
> extract(f, rel)
>   if f = AND(left, right)
> return AND(extract(left, rel), extract(left, rel))
>   if f = OR(left, right)
> return OR(extract(left, rel), extract(left, rel))
>   if var(f) subsetOf var(rel)
> return f
>   return True
> AND(f, True) = AND(True, f) = f
> OR(f, True) = OR(True, f) = True
> {code}
> This algorithm covers deep nested predicates and does not use CNF which 
> increases length of predicate to O(n * e^n) in the worst case.
> The same recursive approache is used in [PostgreSQL 
> orclauses.c|https://github.com/postgres/postgres/blob/164d174bbf9a3aba719c845497863cd3c49a3ad0/src/backend/optimizer/util/orclauses.c#L151-L252]
>  and [Apache Spark 
> predicates.scala|https://github.com/apache/spark/blob/v3.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L227-L272]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] trushev opened a new pull request, #20261: [FLINK-28530] Improvement of extraction of conditions that can be pushed into join inputs

2022-07-12 Thread GitBox


trushev opened a new pull request, #20261:
URL: https://github.com/apache/flink/pull/20261

   
   ## What is the purpose of the change
   
   This PR is aimed at replacing current extraction algorithm with new one 
which covers more complex case with deep nested predicate:
   for all n > 0
   a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) 
=> (a0 or a1 or ... or an)
   
   ## Brief change log
   
 - Introduced new algorithm placed in `FlinkRexExtract`
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added unit tests for extraction `FlinkRexExtractTest`
 - Added test to `JoinTest` to verify changed sql plan
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28265) Inconsistency in Kubernetes HA service: broken state handle

2022-07-12 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566116#comment-17566116
 ] 

Yang Wang commented on FLINK-28265:
---

Since we always add the new checkpoint first and then subsume the oldest one, I 
am curious how it could happen we only have one checkpoint which is invalid. If 
adding the new checkpoint failed, we should have the old successful checkpoint. 
On the contrary, if subsuming the oldest one failed, we should still have the 
newly added checkpoint.

 

Could you please verify the checkpoint 9701 or 9703 exists on the S3?

 

I believe the the logs of previous run(e.g. kubectl logs  --previous) 
and the Kubernetes APIServer audit log will help a lot to debug the root cause.

> Inconsistency in Kubernetes HA service: broken state handle
> ---
>
> Key: FLINK-28265
> URL: https://issues.apache.org/jira/browse/FLINK-28265
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
> Attachments: flink_checkpoint_issue.txt
>
>
> I have a JobManager, which at some point failed to acknowledge a checkpoint:
> {code}
> Error while processing AcknowledgeCheckpoint message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
> the pending checkpoint 193393. Failure reason: Failure to finalize checkpoint.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1255)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100)
>   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: 
> org.apache.flink.runtime.persistence.StateHandleStore$AlreadyExistException: 
> checkpointID-0193393 already exists in ConfigMap 
> cm--jobmanager-leader
>   at 
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.getKeyAlreadyExistException(KubernetesStateHandleStore.java:534)
>   at 
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.lambda$addAndLock$0(KubernetesStateHandleStore.java:155)
>   at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$attemptCheckAndUpdateConfigMap$11(Fabric8FlinkKubeClient.java:316)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source)
>   ... 3 common frames omitted
> {code}
> the JobManager creates subsequent checkpoints successfully.
> Upon failure, it tries to recover this checkpoint (0193393), but 
> fails to do so because of:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve 
> checkpoint 193393 from state handle under checkpointID-0193393. 
> This indicates that the retrieved state handle is broken. Try cleaning the 
> state handle store ... Caused by: java.io.FileNotFoundException: No such file 
> or directory: s3://xxx/flink-ha/xxx/completedCheckpoint72e30229420c
> {code}
> I'm running Flink 1.14.4.
> Note: This issue has been first discussed here: 
> https://github.com/apache/flink/pull/15832#pullrequestreview-1005973050 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on pull request #20247: introduce operation execution plugin

2022-07-12 Thread GitBox


luoyuxia commented on PR #20247:
URL: https://github.com/apache/flink/pull/20247#issuecomment-1182702339

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #20251: [FLINK-26412][hive] Hive dialect supports "create function using jar"

2022-07-12 Thread GitBox


luoyuxia commented on PR #20251:
URL: https://github.com/apache/flink/pull/20251#issuecomment-1182701721

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26412) Hive dialect supports "CREATE FUNCTION USING xxx.jar"

2022-07-12 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-26412:
-
Summary: Hive dialect supports "CREATE FUNCTION USING xxx.jar"  (was: Hive 
dialect supports "CREATE TEMPORARY FUNCTION USING xxx.jar")

> Hive dialect supports "CREATE FUNCTION USING xxx.jar"
> -
>
> Key: FLINK-26412
> URL: https://issues.apache.org/jira/browse/FLINK-26412
> Project: Flink
>  Issue Type: Sub-task
>Reporter: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> In Hive, it's supported to use such sql like 
> {code:java}
> CREATE TEMPORARY FUNCTION USING xxx.jar
> {code}
> to create udf.
> It's also need to be supported using Hive dialect in Flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26412) Hive dialect supports "CREATE FUNCTION USING xxx.jar"

2022-07-12 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-26412:
-
Description: 
In Hive, it's supported to use such sql like
{code:java}
CREATE FUNCTION USING xxx.jar
{code}
to create udf.
It's also need to be supported using Hive dialect in Flink

  was:
In Hive, it's supported to use such sql like 
{code:java}
CREATE TEMPORARY FUNCTION USING xxx.jar
{code}
to create udf.
It's also need to be supported using Hive dialect in Flink


> Hive dialect supports "CREATE FUNCTION USING xxx.jar"
> -
>
> Key: FLINK-26412
> URL: https://issues.apache.org/jira/browse/FLINK-26412
> Project: Flink
>  Issue Type: Sub-task
>Reporter: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> In Hive, it's supported to use such sql like
> {code:java}
> CREATE FUNCTION USING xxx.jar
> {code}
> to create udf.
> It's also need to be supported using Hive dialect in Flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #20260: [FLINK-28315][runtime-web] introduce aggregate stats in tables of the subtasks and taskmanagers

2022-07-12 Thread GitBox


flinkbot commented on PR #20260:
URL: https://github.com/apache/flink/pull/20260#issuecomment-1182700946

   
   ## CI report:
   
   * 0c6876ba81e8834bf4846331783638f29b619884 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-24900) Support to run multiple shuffle plugins in one session cluster

2022-07-12 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-24900:

Fix Version/s: 1.17.0

> Support to run multiple shuffle plugins in one session cluster
> --
>
> Key: FLINK-24900
> URL: https://issues.apache.org/jira/browse/FLINK-24900
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, one Flink cluster can only use one shuffle plugin. However, there 
> are cases where different jobs may need different shuffle implementations. By 
> loading shuffle plugin with the plugin manager and letting jobs select their 
> shuffle service freely, Flink can support to run multiple shuffle plugins in 
> one session cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #20259: [FLINK-28525][hive] Fix unstable test for HiveDialectITCase#testTableWithSubDirsInPartitionDir

2022-07-12 Thread GitBox


flinkbot commented on PR #20259:
URL: https://github.com/apache/flink/pull/20259#issuecomment-1182700677

   
   ## CI report:
   
   * af4fd0821d53c752d5fe88982e17af7cfd0cbb6b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError

2022-07-12 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566114#comment-17566114
 ] 

Huang Xingbo commented on FLINK-28525:
--

Thanks for the quick fix.

> HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with 
> AssertJMultipleFailuresError
> -
>
> Key: FLINK-28525
> URL: https://issues.apache.org/jira/browse/FLINK-28525
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: luoyuxia
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir
>   Time elapsed: 16.31 s  <<< FAILURE!
> 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 
> org.assertj.core.error.AssertJMultipleFailuresError: 
> 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure)
> 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 --
> 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not 
> a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] 
> 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of:
> 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27   
> [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
> 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153)
> 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117)
> 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27  ...(75 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to 
> create input splits.
> 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84)
> 2022-07-12T04:20:27.960Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69)
> 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158)
> 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27  ...(79 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 
> java.util.concurrent.ExecutionException: java.io.IOException: Not a file: 
> file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79)
> 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27  ...(81 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a 
> file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27  at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)
> 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134)
> 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96)
> 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27  ...(4 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed)]
> 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions 
> requirements but none did:
> 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 
> org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27  at 
> 

[GitHub] [flink] yangjunhan commented on pull request #20260: [FLINK-28315][runtime-web] introduce aggregate stats in tables of the subtasks and taskmanagers

2022-07-12 Thread GitBox


yangjunhan commented on PR #20260:
URL: https://github.com/apache/flink/pull/20260#issuecomment-1182699144

   Hi, @simplejason. Could you please help review this PR regarding the 
runtime-web changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError

2022-07-12 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566112#comment-17566112
 ] 

luoyuxia commented on FLINK-28525:
--

Thanks for reporting it. I'll fix it as soon as possible.

The reason is it will read any path first, but the test assumes it'll always 
read the specific path first, thus cause the unstable test.

> HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with 
> AssertJMultipleFailuresError
> -
>
> Key: FLINK-28525
> URL: https://issues.apache.org/jira/browse/FLINK-28525
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: luoyuxia
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir
>   Time elapsed: 16.31 s  <<< FAILURE!
> 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 
> org.assertj.core.error.AssertJMultipleFailuresError: 
> 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure)
> 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 --
> 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not 
> a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] 
> 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of:
> 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27   
> [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
> 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153)
> 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117)
> 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27  ...(75 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to 
> create input splits.
> 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84)
> 2022-07-12T04:20:27.960Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69)
> 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158)
> 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27  ...(79 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 
> java.util.concurrent.ExecutionException: java.io.IOException: Not a file: 
> file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79)
> 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27  ...(81 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a 
> file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27  at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)
> 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134)
> 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96)
> 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27  ...(4 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed)]
> 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions 
> requirements but none did:
> 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 
> org.apache.flink.connectors.hive.FlinkHiveException: 

[jira] [Updated] (FLINK-28530) Improvement of extraction of conditions that can be pushed into join inputs

2022-07-12 Thread Alexander Trushev (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Trushev updated FLINK-28530:
--
Summary: Improvement of extraction of conditions that can be pushed into 
join inputs  (was: Improvement of conditions extraction that can be pushed into 
join inputs)

> Improvement of extraction of conditions that can be pushed into join inputs
> ---
>
> Key: FLINK-28530
> URL: https://issues.apache.org/jira/browse/FLINK-28530
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Alexander Trushev
>Priority: Major
>
> Conditions extraction in batch mode was introduced here FLINK-12509 and in 
> stream mode here FLINK-24139
> h2. Proposal
> This ticket is aimed at replacing current extraction algorithm with new one 
> which covers more complex case with deep nested predicate:
> for all n > 0
> a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) 
> => (a0 or a1 or ... or an)
> *Example.* For n = 3 Flink does not extract (a0 or a1 or a2 or a3):
> {code:java}
> FlinkSQL> explain select * from A join B on (a0=0 and b0=0) or a1=0) and 
> b1=0) or a2=0) and b2=0) or a3=0;
> == Optimized Physical Plan ==
> Join(joinType=[InnerJoin], where=[OR(AND(OR(AND(OR(AND(=(a0, 0), =(b0, 0)), 
> =(a1, 0)), =(b1, 0)), =(a2, 0)), =(b2, 0)), =(a3, 0))], select=[a0, a1, a2, 
> a3, a4, b0, b1, b2, b3, b4], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[single])
> :  +- TableSourceScan(table=[[default_catalog, default_database, A]], 
> fields=[a0, a1, a2, a3, a4])
> +- Exchange(distribution=[single])
>+- TableSourceScan(table=[[default_catalog, default_database, B]], 
> fields=[b0, b1, b2, b3, b4])
> {code}
> while PostgreSQL does:
> {code:java}
> postgres=# explain select * from A join B on ((a0=0 and b0=0) or a1=0) 
> and b1=0) or a2=0) and b2=0) or a3=0);
>   QUERY PLAN
> --
>  Nested Loop  (cost=0.00..1805.09 rows=14632 width=40)
>Join Filter: (((a.a0 = 0) AND (b.b0 = 0)) OR (a.a1 = 0)) AND (b.b1 = 
> 0)) OR (a.a2 = 0)) AND (b.b2 = 0)) OR (a.a3 = 0))
>->  Seq Scan on b  (cost=0.00..27.00 rows=1700 width=20)
>->  Materialize  (cost=0.00..44.17 rows=34 width=20)
>  ->  Seq Scan on a  (cost=0.00..44.00 rows=34 width=20)
>Filter: ((a0 = 0) OR (a1 = 0) OR (a2 = 0) OR (a3 = 0))
> {code}
> h2. Details
> Pseudocode of new algorithm:
> f – predicate
> rel – table
> var(rel) – columns
> {code:java}
> extract(f, rel)
>   if f = AND(left, right)
> return AND(extract(left, rel), extract(left, rel))
>   if f = OR(left, right)
> return OR(extract(left, rel), extract(left, rel))
>   if var(f) subsetOf var(rel)
> return f
>   return True
> AND(f, True) = AND(True, f) = f
> OR(f, True) = OR(True, f) = True
> {code}
> This algorithm covers deep nested predicates and does not use CNF which 
> increases length of predicate to O(n * e^n) in the worst case.
> The same recursive approache is used in [PostgreSQL 
> orclauses.c|https://github.com/postgres/postgres/blob/164d174bbf9a3aba719c845497863cd3c49a3ad0/src/backend/optimizer/util/orclauses.c#L151-L252]
>  and [Apache Spark 
> predicates.scala|https://github.com/apache/spark/blob/v3.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L227-L272]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28315) [UI] Introduce aggregate stats in tables of the subtasks and taskmanagers

2022-07-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28315:
---
Labels: pull-request-available  (was: )

> [UI] Introduce aggregate stats in tables of the subtasks and taskmanagers
> -
>
> Key: FLINK-28315
> URL: https://issues.apache.org/jira/browse/FLINK-28315
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.16.0
>Reporter: Junhan Yang
>Assignee: Junhan Yang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yangjunhan opened a new pull request, #20260: [FLINK-28315][runtime-web] introduce aggregate stats in tables of the subtasks and taskmanagers

2022-07-12 Thread GitBox


yangjunhan opened a new pull request, #20260:
URL: https://github.com/apache/flink/pull/20260

   
   
   ## What is the purpose of the change
   
   A subtask for the 
[FLIP-241](https://cwiki.apache.org/confluence/display/FLINK/FLIP-241%3A+Completed+Jobs+Information+Enhancement),
 the referenced issue is FLINK-28315.
   
   
   ## Brief change log
   
   Split the per-subtask metrics and aggregated metrics into tabs, and add a 
table of aggregated metrics.
   
   Extend the original subtasks table by adding extra columns:
   - "Accumulated Time"
   - "Status Durations" 
   
   Under the TaskManagers tab, each taskManager row supports an action of 
viewing the "Aggregated Metrics". The modal's content table is identical to 
that under the subtask tab.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28524) Translate "User-defined Sources & Sinks"

2022-07-12 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566111#comment-17566111
 ] 

Jark Wu commented on FLINK-28524:
-

Sorry, there is already an issue for this. FLINK-16105

> Translate "User-defined Sources & Sinks"
> 
>
> Key: FLINK-28524
> URL: https://issues.apache.org/jira/browse/FLINK-28524
> Project: Flink
>  Issue Type: Improvement
>Reporter: hunter
>Priority: Minor
>
> The file is docs/content.zh/docs/dev/table/sourcesSinks.md
> The link is 
> [用户自定义Sources|https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28524) Translate "User-defined Sources & Sinks"

2022-07-12 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-28524.
---
Resolution: Duplicate

> Translate "User-defined Sources & Sinks"
> 
>
> Key: FLINK-28524
> URL: https://issues.apache.org/jira/browse/FLINK-28524
> Project: Flink
>  Issue Type: Improvement
>Reporter: hunter
>Priority: Minor
>
> The file is docs/content.zh/docs/dev/table/sourcesSinks.md
> The link is 
> [用户自定义Sources|https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28530) Improvement of conditions extraction that can be pushed into join inputs

2022-07-12 Thread Alexander Trushev (Jira)
Alexander Trushev created FLINK-28530:
-

 Summary: Improvement of conditions extraction that can be pushed 
into join inputs
 Key: FLINK-28530
 URL: https://issues.apache.org/jira/browse/FLINK-28530
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Alexander Trushev


Conditions extraction in batch mode was introduced here FLINK-12509 and in 
stream mode here FLINK-24139
h2. Proposal

This ticket is aimed at replacing current extraction algorithm with new one 
which covers more complex case with deep nested predicate:
for all n > 0
a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) => 
(a0 or a1 or ... or an)

*Example.* For n = 3 Flink does not extract (a0 or a1 or a2 or a3):
{code:java}
FlinkSQL> explain select * from A join B on (a0=0 and b0=0) or a1=0) and 
b1=0) or a2=0) and b2=0) or a3=0;

== Optimized Physical Plan ==
Join(joinType=[InnerJoin], where=[OR(AND(OR(AND(OR(AND(=(a0, 0), =(b0, 0)), 
=(a1, 0)), =(b1, 0)), =(a2, 0)), =(b2, 0)), =(a3, 0))], select=[a0, a1, a2, a3, 
a4, b0, b1, b2, b3, b4], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[single])
:  +- TableSourceScan(table=[[default_catalog, default_database, A]], 
fields=[a0, a1, a2, a3, a4])
+- Exchange(distribution=[single])
   +- TableSourceScan(table=[[default_catalog, default_database, B]], 
fields=[b0, b1, b2, b3, b4])
{code}
while PostgreSQL does:
{code:java}
postgres=# explain select * from A join B on ((a0=0 and b0=0) or a1=0) and 
b1=0) or a2=0) and b2=0) or a3=0);
  QUERY PLAN
--
 Nested Loop  (cost=0.00..1805.09 rows=14632 width=40)
   Join Filter: (((a.a0 = 0) AND (b.b0 = 0)) OR (a.a1 = 0)) AND (b.b1 = 0)) 
OR (a.a2 = 0)) AND (b.b2 = 0)) OR (a.a3 = 0))
   ->  Seq Scan on b  (cost=0.00..27.00 rows=1700 width=20)
   ->  Materialize  (cost=0.00..44.17 rows=34 width=20)
 ->  Seq Scan on a  (cost=0.00..44.00 rows=34 width=20)
   Filter: ((a0 = 0) OR (a1 = 0) OR (a2 = 0) OR (a3 = 0))
{code}
h2. Details

Pseudocode of new algorithm:

f – predicate
rel – table
var(rel) – columns
{code:java}
extract(f, rel)
  if f = AND(left, right)
return AND(extract(left, rel), extract(left, rel))
  if f = OR(left, right)
return OR(extract(left, rel), extract(left, rel))
  if var(f) subsetOf var(rel)
return f
  return True

AND(f, True) = AND(True, f) = f
OR(f, True) = OR(True, f) = True
{code}
This algorithm covers deep nested predicates and does not use CNF which 
increases length of predicate to O(n * e^n) in the worst case.

The same recursive approache is used in [PostgreSQL 
orclauses.c|https://github.com/postgres/postgres/blob/164d174bbf9a3aba719c845497863cd3c49a3ad0/src/backend/optimizer/util/orclauses.c#L151-L252]
 and [Apache Spark 
predicates.scala|https://github.com/apache/spark/blob/v3.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L227-L272]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26993) CheckpointCoordinatorTest#testMinCheckpointPause

2022-07-12 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566110#comment-17566110
 ] 

Huang Xingbo commented on FLINK-26993:
--

instance in release-1.14: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38058=logs=b0a398c0-685b-599c-eb57-c8c2a771138e=747432ad-a576-5911-1e2a-68c6bedc248a

> CheckpointCoordinatorTest#testMinCheckpointPause
> 
>
> Key: FLINK-26993
> URL: https://issues.apache.org/jira/browse/FLINK-26993
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.15.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.1, 1.16.0
>
>
> The test triggers checkpoints, waits for the CC to have stored a pending 
> checkpoint, and then sends an acknowledge.
> The acknowledge can fail with an NPE because the 
> PendingCheckpoint#checkpointTargetLocation hasn't been set yet. This doesn't 
> happen synchronously with the PendingCheckpoint being added to 
> CheckpointCoordinator#pendingCheckpoints.
> {code}
> Apr 01 19:57:36 [ERROR] 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.testMinCheckpointPause
>   Time elapsed: 0.012 s  <<< ERROR!
> Apr 01 19:57:36 org.apache.flink.runtime.checkpoint.CheckpointException: 
> Could not finalize the pending checkpoint 1. Failure reason: Failure to 
> finalize checkpoint.
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1354)
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241)
> Apr 01 19:57:36   at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> ...
> Apr 01 19:57:36 Caused by: java.lang.NullPointerException
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:327)
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337)
> Apr 01 19:57:36   ... 50 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError

2022-07-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28525:
---
Labels: pull-request-available test-stability  (was: test-stability)

> HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with 
> AssertJMultipleFailuresError
> -
>
> Key: FLINK-28525
> URL: https://issues.apache.org/jira/browse/FLINK-28525
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: luoyuxia
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir
>   Time elapsed: 16.31 s  <<< FAILURE!
> 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 
> org.assertj.core.error.AssertJMultipleFailuresError: 
> 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure)
> 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 --
> 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not 
> a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] 
> 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of:
> 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27   
> [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
> 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153)
> 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117)
> 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27  ...(75 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to 
> create input splits.
> 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84)
> 2022-07-12T04:20:27.960Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69)
> 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158)
> 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27  ...(79 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 
> java.util.concurrent.ExecutionException: java.io.IOException: Not a file: 
> file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79)
> 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27  ...(81 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a 
> file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27  at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)
> 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134)
> 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96)
> 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27  ...(4 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed)]
> 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions 
> requirements but none did:
> 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 
> org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27  at 
> 

[GitHub] [flink] luoyuxia opened a new pull request, #20259: [FLINK-28525][hive] Fix unstable test for HiveDialectITCase#testTableWithSubDirsInPartitionDir

2022-07-12 Thread GitBox


luoyuxia opened a new pull request, #20259:
URL: https://github.com/apache/flink/pull/20259

   
   
   ## What is the purpose of the change
   To fix the unstable test for 
`HiveDialectITCase#testTableWithSubDirsInPartitionDir`
   
   
   ## Brief change log
 - Change `satisfies` to `satisfiesAnyOf` for it may read any of file first 
and throw the corresponding exception.
   
   
   ## Verifying this change
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? N/A
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28305) SQL Client end-to-end test failed with ElasticsearchException

2022-07-12 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566109#comment-17566109
 ] 

Huang Xingbo commented on FLINK-28305:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38058=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c

> SQL Client end-to-end test failed with ElasticsearchException
> -
>
> Key: FLINK-28305
> URL: https://issues.apache.org/jira/browse/FLINK-28305
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.14.5
>Reporter: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-06-29T19:14:39.0384925Z Jun 29 19:14:38 java.lang.RuntimeException: An 
> error occurred in ElasticsearchSink.
> 2022-06-29T19:14:39.0385547Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)
>  ~[?:?]
> 2022-06-29T19:14:39.0386327Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)
>  ~[?:?]
> 2022-06-29T19:14:39.0387078Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)
>  ~[?:?]
> 2022-06-29T19:14:39.0388071Z Jun 29 19:14:38  at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
>  ~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0389189Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0390354Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0391515Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0392712Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0393822Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:495)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0394861Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0395885Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0396858Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0397824Z Jun 29 19:14:38  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0398882Z Jun 29 19:14:38  at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0400103Z Jun 29 19:14:38  at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0401000Z Jun 29 19:14:38  at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> 2022-06-29T19:14:39.0401575Z Jun 29 19:14:38  at 
> java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_332]
> 2022-06-29T19:14:39.0402051Z Jun 29 19:14:38  Suppressed: 
> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
> 2022-06-29T19:14:39.0402682Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)
>  ~[?:?]
> 2022-06-29T19:14:39.0403421Z Jun 29 19:14:38  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:366)
>  ~[?:?]
> 2022-06-29T19:14:39.0404395Z Jun 29 19:14:38  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  

[jira] [Closed] (FLINK-28527) Fail to lateral join with UDTF from Table with timstamp column

2022-07-12 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo closed FLINK-28527.

Resolution: Duplicate

> Fail to lateral join with UDTF from Table with timstamp column
> --
>
> Key: FLINK-28527
> URL: https://issues.apache.org/jira/browse/FLINK-28527
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Priority: Major
>
> The bug can be reproduced with the following test
> {code:python}
> def test_flink(self):
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> table = t_env.from_descriptor(
> TableDescriptor.for_connector("filesystem")
> .schema(
> Schema.new_builder()
> .column("name", DataTypes.STRING())
> .column("cost", DataTypes.INT())
> .column("distance", DataTypes.INT())
> .column("time", DataTypes.TIMESTAMP(3))
> .watermark("time", "`time` - INTERVAL '60' SECOND")
> .build()
> )
> .format("csv")
> .option("path", "./input.csv")
> .build()
> )
> @udtf(result_types=DataTypes.INT())
> def table_func(row: Row):
> return row.cost + row.distance
> table = table.join_lateral(table_func.alias("cost_times_distance"))
> table.execute().print()
> {code}
> It causes the following exception
> {code:none}
> E   pyflink.util.exceptions.TableException: 
> org.apache.flink.table.api.TableException: Unsupported Python SqlFunction 
> CAST.
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
> E at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79)
> E at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> E at scala.collection.Iterator.foreach(Iterator.scala:937)
> E at scala.collection.Iterator.foreach$(Iterator.scala:937)
> E at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> E at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> E at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> E at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> E at 
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
> E at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> E at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> E at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78)
> E at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
> E at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605)
> 

[jira] [Commented] (FLINK-28198) CassandraConnectorITCase fails with timeout

2022-07-12 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566106#comment-17566106
 ] 

Huang Xingbo commented on FLINK-28198:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38057=logs=dbe51908-4958-5c8c-9557-e10952d4259d=55d11a16-067d-538d-76a3-4c096a3a8e24

> CassandraConnectorITCase fails with timeout
> ---
>
> Key: FLINK-28198
> URL: https://issues.apache.org/jira/browse/FLINK-28198
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra
>Affects Versions: 1.16.0
>Reporter: Martijn Visser
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.0
>
>
> {code:java}
> Jun 22 07:57:37 [ERROR] 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRaiseCassandraRequestsTimeouts
>   Time elapsed: 12.067 s  <<< ERROR!
> Jun 22 07:57:37 
> com.datastax.driver.core.exceptions.OperationTimedOutException: 
> [/172.17.0.1:59915] Timed out waiting for server response
> Jun 22 07:57:37   at 
> com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:43)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:25)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=13736



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before complet

2022-07-12 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-28529:


Assignee: Yanfei Lei

> ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode
>  failed with CheckpointException: Checkpoint expired before completing
> ---
>
> Key: FLINK-28529
> URL: https://issues.apache.org/jira/browse/FLINK-28529
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] 
> ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode
>   Time elapsed: 617.048 s  <<< ERROR!
> 2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
> before completing.
> 2022-07-12T04:30:49.9913880Z Jul 12 04:30:49  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2022-07-12T04:30:49.9914606Z Jul 12 04:30:49  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2022-07-12T04:30:49.9915572Z Jul 12 04:30:49  at 
> org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125)
> 2022-07-12T04:30:49.9916483Z Jul 12 04:30:49  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-07-12T04:30:49.9917377Z Jul 12 04:30:49  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-07-12T04:30:49.9918121Z Jul 12 04:30:49  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-07-12T04:30:49.9918788Z Jul 12 04:30:49  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-07-12T04:30:49.9919456Z Jul 12 04:30:49  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-07-12T04:30:49.9920193Z Jul 12 04:30:49  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-07-12T04:30:49.9920923Z Jul 12 04:30:49  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-07-12T04:30:49.9921630Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-07-12T04:30:49.9922326Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-07-12T04:30:49.9923023Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-07-12T04:30:49.9923708Z Jul 12 04:30:49  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-07-12T04:30:49.9924449Z Jul 12 04:30:49  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-07-12T04:30:49.9925124Z Jul 12 04:30:49  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-07-12T04:30:49.9925912Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-07-12T04:30:49.9926742Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-07-12T04:30:49.9928142Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-07-12T04:30:49.9928715Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-07-12T04:30:49.9929311Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-07-12T04:30:49.9929863Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-07-12T04:30:49.9930376Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-07-12T04:30:49.9930911Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-07-12T04:30:49.9931441Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-07-12T04:30:49.9931975Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-07-12T04:30:49.9932493Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 2022-07-12T04:30:49.9932966Z Jul 12 04:30:49  at 
> 

[jira] [Commented] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before comple

2022-07-12 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566105#comment-17566105
 ] 

Huang Xingbo commented on FLINK-28529:
--

Hi [~Yanfei Lei], could you help take a look?

> ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode
>  failed with CheckpointException: Checkpoint expired before completing
> ---
>
> Key: FLINK-28529
> URL: https://issues.apache.org/jira/browse/FLINK-28529
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] 
> ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode
>   Time elapsed: 617.048 s  <<< ERROR!
> 2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
> before completing.
> 2022-07-12T04:30:49.9913880Z Jul 12 04:30:49  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2022-07-12T04:30:49.9914606Z Jul 12 04:30:49  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2022-07-12T04:30:49.9915572Z Jul 12 04:30:49  at 
> org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125)
> 2022-07-12T04:30:49.9916483Z Jul 12 04:30:49  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-07-12T04:30:49.9917377Z Jul 12 04:30:49  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-07-12T04:30:49.9918121Z Jul 12 04:30:49  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-07-12T04:30:49.9918788Z Jul 12 04:30:49  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-07-12T04:30:49.9919456Z Jul 12 04:30:49  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-07-12T04:30:49.9920193Z Jul 12 04:30:49  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-07-12T04:30:49.9920923Z Jul 12 04:30:49  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-07-12T04:30:49.9921630Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-07-12T04:30:49.9922326Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-07-12T04:30:49.9923023Z Jul 12 04:30:49  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-07-12T04:30:49.9923708Z Jul 12 04:30:49  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-07-12T04:30:49.9924449Z Jul 12 04:30:49  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-07-12T04:30:49.9925124Z Jul 12 04:30:49  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-07-12T04:30:49.9925912Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-07-12T04:30:49.9926742Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-07-12T04:30:49.9928142Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-07-12T04:30:49.9928715Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-07-12T04:30:49.9929311Z Jul 12 04:30:49  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-07-12T04:30:49.9929863Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-07-12T04:30:49.9930376Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-07-12T04:30:49.9930911Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-07-12T04:30:49.9931441Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-07-12T04:30:49.9931975Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-07-12T04:30:49.9932493Z Jul 12 04:30:49  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 2022-07-12T04:30:49.9932966Z Jul 12 

[jira] [Assigned] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError

2022-07-12 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-28525:


Assignee: luoyuxia

> HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with 
> AssertJMultipleFailuresError
> -
>
> Key: FLINK-28525
> URL: https://issues.apache.org/jira/browse/FLINK-28525
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: luoyuxia
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir
>   Time elapsed: 16.31 s  <<< FAILURE!
> 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 
> org.assertj.core.error.AssertJMultipleFailuresError: 
> 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure)
> 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 --
> 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not 
> a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] 
> 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of:
> 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27   
> [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
> 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153)
> 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117)
> 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27  ...(75 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to 
> create input splits.
> 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84)
> 2022-07-12T04:20:27.960Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69)
> 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158)
> 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27  ...(79 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 
> java.util.concurrent.ExecutionException: java.io.IOException: Not a file: 
> file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79)
> 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27  ...(81 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a 
> file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27  at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)
> 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134)
> 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96)
> 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27  ...(4 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed)]
> 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions 
> requirements but none did:
> 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 
> org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
> 

[jira] [Created] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before completi

2022-07-12 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-28529:


 Summary: 
ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode
 failed with CheckpointException: Checkpoint expired before completing
 Key: FLINK-28529
 URL: https://issues.apache.org/jira/browse/FLINK-28529
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] 
ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode
  Time elapsed: 617.048 s  <<< ERROR!
2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
before completing.
2022-07-12T04:30:49.9913880Z Jul 12 04:30:49at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
2022-07-12T04:30:49.9914606Z Jul 12 04:30:49at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
2022-07-12T04:30:49.9915572Z Jul 12 04:30:49at 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125)
2022-07-12T04:30:49.9916483Z Jul 12 04:30:49at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-07-12T04:30:49.9917377Z Jul 12 04:30:49at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-07-12T04:30:49.9918121Z Jul 12 04:30:49at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-07-12T04:30:49.9918788Z Jul 12 04:30:49at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-07-12T04:30:49.9919456Z Jul 12 04:30:49at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
2022-07-12T04:30:49.9920193Z Jul 12 04:30:49at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2022-07-12T04:30:49.9920923Z Jul 12 04:30:49at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
2022-07-12T04:30:49.9921630Z Jul 12 04:30:49at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2022-07-12T04:30:49.9922326Z Jul 12 04:30:49at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2022-07-12T04:30:49.9923023Z Jul 12 04:30:49at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
2022-07-12T04:30:49.9923708Z Jul 12 04:30:49at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
2022-07-12T04:30:49.9924449Z Jul 12 04:30:49at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
2022-07-12T04:30:49.9925124Z Jul 12 04:30:49at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
2022-07-12T04:30:49.9925912Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-07-12T04:30:49.9926742Z Jul 12 04:30:49at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
2022-07-12T04:30:49.9928142Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
2022-07-12T04:30:49.9928715Z Jul 12 04:30:49at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
2022-07-12T04:30:49.9929311Z Jul 12 04:30:49at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
2022-07-12T04:30:49.9929863Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
2022-07-12T04:30:49.9930376Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
2022-07-12T04:30:49.9930911Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
2022-07-12T04:30:49.9931441Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
2022-07-12T04:30:49.9931975Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
2022-07-12T04:30:49.9932493Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
2022-07-12T04:30:49.9932966Z Jul 12 04:30:49at 
org.junit.runners.Suite.runChild(Suite.java:128)
2022-07-12T04:30:49.9933427Z Jul 12 04:30:49at 
org.junit.runners.Suite.runChild(Suite.java:27)
2022-07-12T04:30:49.9933911Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
2022-07-12T04:30:49.9934424Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
2022-07-12T04:30:49.9934951Z Jul 12 04:30:49at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)

[jira] [Updated] (FLINK-28528) Table.getSchema fails on table with watermark

2022-07-12 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-28528:
---
Description: 
The bug can be reproduced with the following test. The test can pass if we use 
the commented way to define the watermark.
{code:python}
def test_flink_2(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", expr.col("time") - expr.lit(60).seconds)
# .watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

print(table.get_schema())
{code}
It causes the following exception
{code:none}
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is 
not string serializable. Currently, only expressions that originated from a SQL 
expression have a well-defined string representation.
E   at 
org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
E   at 
org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
E   at 
java.util.Collections$SingletonList.forEach(Collections.java:4824)
E   at 
org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
E   at org.apache.flink.table.api.Table.getSchema(Table.java:101)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at java.lang.reflect.Method.invoke(Method.java:498)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E   at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
E   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E   at java.lang.Thread.run(Thread.java:748)
{code}

  was:
The bug can be reproduced with the following test. The test can pass if we use 
the commented way to define the watermark.

{code:python}
def test_flink_2(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", expr.col("time") - expr.lit(60).seconds)
# .watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

print(table.get_schema())
{code}

It causes the following exception

{code:none}
// Some comments here
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is 
not string serializable. Currently, only expressions that originated from a SQL 
expression have a well-defined string representation.
E   at 
org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
E   at 
org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
E   at 
java.util.Collections$SingletonList.forEach(Collections.java:4824)
E   at 
org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
E   at org.apache.flink.table.api.Table.getSchema(Table.java:101)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 

[jira] [Created] (FLINK-28528) Table.getSchema fails on table with watermark

2022-07-12 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28528:
--

 Summary: Table.getSchema fails on table with watermark
 Key: FLINK-28528
 URL: https://issues.apache.org/jira/browse/FLINK-28528
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Xuannan Su


The bug can be reproduced with the following test. The test can pass if we use 
the commented way to define the watermark.

{code:python}
def test_flink_2(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", expr.col("time") - expr.lit(60).seconds)
# .watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

print(table.get_schema())
{code}

It causes the following exception

{code:none}
// Some comments here
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is 
not string serializable. Currently, only expressions that originated from a SQL 
expression have a well-defined string representation.
E   at 
org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
E   at 
org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
E   at 
java.util.Collections$SingletonList.forEach(Collections.java:4824)
E   at 
org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
E   at org.apache.flink.table.api.Table.getSchema(Table.java:101)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at java.lang.reflect.Method.invoke(Method.java:498)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E   at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
E   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E   at java.lang.Thread.run(Thread.java:748)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28267) KafkaSourceLegacyITCase.testBrokerFailure hang on azure

2022-07-12 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-28267:
-

Assignee: Qingsheng Ren

> KafkaSourceLegacyITCase.testBrokerFailure hang on azure
> ---
>
> Key: FLINK-28267
> URL: https://issues.apache.org/jira/browse/FLINK-28267
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Assignee: Qingsheng Ren
>Priority: Critical
>
> "main" #1 prio=5 os_prio=0 tid=0x7f5ae000b800 nid=0x22c2 waiting on 
> condition [0x7f5ae8398000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xa5565010> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:67)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBrokerFailureTest(KafkaConsumerTestBase.java:1506)
>   at 
> org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase.testBrokerFailure(KafkaSourceLegacyITCase.java:94)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37247=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203=41526



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28527) Fail to lateral join with UDTF from Table with timstamp column

2022-07-12 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28527:
--

 Summary: Fail to lateral join with UDTF from Table with timstamp 
column
 Key: FLINK-28527
 URL: https://issues.apache.org/jira/browse/FLINK-28527
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Xuannan Su


The bug can be reproduced with the following test


{code:python}
def test_flink(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

@udtf(result_types=DataTypes.INT())
def table_func(row: Row):
return row.cost + row.distance

table = table.join_lateral(table_func.alias("cost_times_distance"))

table.execute().print()
{code}

It causes the following exception


{code:none}
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Unsupported Python SqlFunction CAST.
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
E   at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79)
E   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
E   at scala.collection.Iterator.foreach(Iterator.scala:937)
E   at scala.collection.Iterator.foreach$(Iterator.scala:937)
E   at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
E   at scala.collection.IterableLike.foreach(IterableLike.scala:70)
E   at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
E   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
E   at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
E   at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
E   at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
E   at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78)
E   at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
E   at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at 

[jira] [Created] (FLINK-28526) Fail to lateral join with UDTF from Table with timstamp column

2022-07-12 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28526:
--

 Summary: Fail to lateral join with UDTF from Table with timstamp 
column
 Key: FLINK-28526
 URL: https://issues.apache.org/jira/browse/FLINK-28526
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Xuannan Su


The bug can be reproduced with the following test


{code:python}
def test_flink(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

@udtf(result_types=DataTypes.INT())
def table_func(row: Row):
return row.cost + row.distance

table = table.join_lateral(table_func.alias("cost_times_distance"))

table.execute().print()
{code}

It causes the following exception


{code:none}
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Unsupported Python SqlFunction CAST.
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
E   at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79)
E   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
E   at scala.collection.Iterator.foreach(Iterator.scala:937)
E   at scala.collection.Iterator.foreach$(Iterator.scala:937)
E   at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
E   at scala.collection.IterableLike.foreach(IterableLike.scala:70)
E   at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
E   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
E   at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
E   at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
E   at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
E   at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78)
E   at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
E   at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at 

[jira] [Commented] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError

2022-07-12 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566100#comment-17566100
 ] 

Huang Xingbo commented on FLINK-28525:
--

Hi [~luoyuxia] , could you help take a look?

> HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with 
> AssertJMultipleFailuresError
> -
>
> Key: FLINK-28525
> URL: https://issues.apache.org/jira/browse/FLINK-28525
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir
>   Time elapsed: 16.31 s  <<< FAILURE!
> 2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 
> org.assertj.core.error.AssertJMultipleFailuresError: 
> 2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure)
> 2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 --
> 2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not 
> a file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] 
> 2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of:
> 2022-07-12T04:20:27.9603145Z Jul 12 04:20:27   
> [org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9603766Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
> 2022-07-12T04:20:27.9604403Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153)
> 2022-07-12T04:20:27.9605039Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117)
> 2022-07-12T04:20:27.9605902Z Jul 12 04:20:27  ...(75 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to 
> create input splits.
> 2022-07-12T04:20:27.9606952Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84)
> 2022-07-12T04:20:27.960Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69)
> 2022-07-12T04:20:27.9608438Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158)
> 2022-07-12T04:20:27.9609261Z Jul 12 04:20:27  ...(79 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 
> java.util.concurrent.ExecutionException: java.io.IOException: Not a file: 
> file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9610503Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2022-07-12T04:20:27.9610998Z Jul 12 04:20:27  at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2022-07-12T04:20:27.9611574Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79)
> 2022-07-12T04:20:27.9612390Z Jul 12 04:20:27  ...(81 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed),
> 2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a 
> file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
> 2022-07-12T04:20:27.9613522Z Jul 12 04:20:27  at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)
> 2022-07-12T04:20:27.9614119Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134)
> 2022-07-12T04:20:27.9614731Z Jul 12 04:20:27  at 
> org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96)
> 2022-07-12T04:20:27.9615535Z Jul 12 04:20:27  ...(4 remaining lines not 
> displayed - this can be changed with 
> Assertions.setMaxStackTraceElementsDisplayed)]
> 2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions 
> requirements but none did:
> 2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 
> 2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 
> org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: 
> Fail to create input splits.
> 2022-07-12T04:20:27.9617407Z Jul 12 04:20:27  at 
> 

[jira] [Created] (FLINK-28525) HiveDialectITCase.testTableWithSubDirsInPartitionDir failed with AssertJMultipleFailuresError

2022-07-12 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-28525:


 Summary: HiveDialectITCase.testTableWithSubDirsInPartitionDir 
failed with AssertJMultipleFailuresError
 Key: FLINK-28525
 URL: https://issues.apache.org/jira/browse/FLINK-28525
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-07-12T04:20:27.9597185Z Jul 12 04:20:27 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectITCase.testTableWithSubDirsInPartitionDir
  Time elapsed: 16.31 s  <<< FAILURE!
2022-07-12T04:20:27.9598238Z Jul 12 04:20:27 
org.assertj.core.error.AssertJMultipleFailuresError: 
2022-07-12T04:20:27.9598978Z Jul 12 04:20:27 
2022-07-12T04:20:27.9599419Z Jul 12 04:20:27 Multiple Failures (1 failure)
2022-07-12T04:20:27.9600269Z Jul 12 04:20:27 -- failure 1 --
2022-07-12T04:20:27.9601093Z Jul 12 04:20:27 [Any cause contains message 'Not a 
file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=2'] 
2022-07-12T04:20:27.9602679Z Jul 12 04:20:27 Expecting any element of:
2022-07-12T04:20:27.9603145Z Jul 12 04:20:27   
[org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: Fail 
to create input splits.
2022-07-12T04:20:27.9603766Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
2022-07-12T04:20:27.9604403Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153)
2022-07-12T04:20:27.9605039Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117)
2022-07-12T04:20:27.9605902Z Jul 12 04:20:27...(75 remaining lines not 
displayed - this can be changed with 
Assertions.setMaxStackTraceElementsDisplayed),
2022-07-12T04:20:27.9606408Z Jul 12 04:20:27 java.io.IOException: Fail to 
create input splits.
2022-07-12T04:20:27.9606952Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84)
2022-07-12T04:20:27.960Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69)
2022-07-12T04:20:27.9608438Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:158)
2022-07-12T04:20:27.9609261Z Jul 12 04:20:27...(79 remaining lines not 
displayed - this can be changed with 
Assertions.setMaxStackTraceElementsDisplayed),
2022-07-12T04:20:27.9609899Z Jul 12 04:20:27 
java.util.concurrent.ExecutionException: java.io.IOException: Not a file: 
file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
2022-07-12T04:20:27.9610503Z Jul 12 04:20:27at 
java.util.concurrent.FutureTask.report(FutureTask.java:122)
2022-07-12T04:20:27.9610998Z Jul 12 04:20:27at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)
2022-07-12T04:20:27.9611574Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79)
2022-07-12T04:20:27.9612390Z Jul 12 04:20:27...(81 remaining lines not 
displayed - this can be changed with 
Assertions.setMaxStackTraceElementsDisplayed),
2022-07-12T04:20:27.9612953Z Jul 12 04:20:27 java.io.IOException: Not a 
file: file:/tmp/junit297605250552556431/hive_warehouse/fact_tz/ds=1/hr=1
2022-07-12T04:20:27.9613522Z Jul 12 04:20:27at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)
2022-07-12T04:20:27.9614119Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:134)
2022-07-12T04:20:27.9614731Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96)
2022-07-12T04:20:27.9615535Z Jul 12 04:20:27...(4 remaining lines not 
displayed - this can be changed with 
Assertions.setMaxStackTraceElementsDisplayed)]
2022-07-12T04:20:27.9616042Z Jul 12 04:20:27 to satisfy the given assertions 
requirements but none did:
2022-07-12T04:20:27.9616381Z Jul 12 04:20:27 
2022-07-12T04:20:27.9616795Z Jul 12 04:20:27 
org.apache.flink.connectors.hive.FlinkHiveException: java.io.IOException: Fail 
to create input splits.
2022-07-12T04:20:27.9617407Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
2022-07-12T04:20:27.9618034Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:153)
2022-07-12T04:20:27.9618650Z Jul 12 04:20:27at 
org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:117)
2022-07-12T04:20:27.9619455Z Jul 12 04:20:27...(75 remaining lines not 
displayed - this can be changed with 
Assertions.setMaxStackTraceElementsDisplayed)

[jira] [Commented] (FLINK-28524) Translate "User-defined Sources & Sinks"

2022-07-12 Thread hunter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566098#comment-17566098
 ] 

hunter commented on FLINK-28524:


I am very interested in this part, please assign it to me

> Translate "User-defined Sources & Sinks"
> 
>
> Key: FLINK-28524
> URL: https://issues.apache.org/jira/browse/FLINK-28524
> Project: Flink
>  Issue Type: Improvement
>Reporter: hunter
>Priority: Minor
>
> The file is docs/content.zh/docs/dev/table/sourcesSinks.md
> The link is 
> [用户自定义Sources|https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28524) Translate "User-defined Sources & Sinks"

2022-07-12 Thread hunter (Jira)
hunter created FLINK-28524:
--

 Summary: Translate "User-defined Sources & Sinks"
 Key: FLINK-28524
 URL: https://issues.apache.org/jira/browse/FLINK-28524
 Project: Flink
  Issue Type: Improvement
Reporter: hunter


The file is docs/content.zh/docs/dev/table/sourcesSinks.md

The link is 
[用户自定义Sources|https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28267) KafkaSourceLegacyITCase.testBrokerFailure hang on azure

2022-07-12 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566096#comment-17566096
 ] 

Huang Xingbo commented on FLINK-28267:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38057=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203

> KafkaSourceLegacyITCase.testBrokerFailure hang on azure
> ---
>
> Key: FLINK-28267
> URL: https://issues.apache.org/jira/browse/FLINK-28267
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Priority: Critical
>
> "main" #1 prio=5 os_prio=0 tid=0x7f5ae000b800 nid=0x22c2 waiting on 
> condition [0x7f5ae8398000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xa5565010> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:67)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBrokerFailureTest(KafkaConsumerTestBase.java:1506)
>   at 
> org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase.testBrokerFailure(KafkaSourceLegacyITCase.java:94)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37247=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203=41526



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28523) LocalRecoveryITCase failed with AssertionError

2022-07-12 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-28523:


 Summary: LocalRecoveryITCase failed with AssertionError
 Key: FLINK-28523
 URL: https://issues.apache.org/jira/browse/FLINK-28523
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-07-12T02:28:07.2243130Z Jul 12 02:28:07 [ERROR] 
LocalRecoveryITCase.executeTest  Time elapsed: 29.154 s  <<< FAILURE!
2022-07-12T02:28:07.2243760Z Jul 12 02:28:07 java.lang.AssertionError: Job 
completed with illegal application status: UNKNOWN.
2022-07-12T02:28:07.2244333Z Jul 12 02:28:07at 
org.junit.Assert.fail(Assert.java:89)
2022-07-12T02:28:07.2245097Z Jul 12 02:28:07at 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.testSlidingTimeWindow(EventTimeWindowCheckpointingITCase.java:529)
2022-07-12T02:28:07.2245983Z Jul 12 02:28:07at 
org.apache.flink.test.checkpointing.LocalRecoveryITCase.executeTest(LocalRecoveryITCase.java:84)
2022-07-12T02:28:07.2246802Z Jul 12 02:28:07at 
org.apache.flink.test.checkpointing.LocalRecoveryITCase.executeTest(LocalRecoveryITCase.java:66)
2022-07-12T02:28:07.2247680Z Jul 12 02:28:07at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-07-12T02:28:07.2248340Z Jul 12 02:28:07at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-07-12T02:28:07.2249573Z Jul 12 02:28:07at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-07-12T02:28:07.2250788Z Jul 12 02:28:07at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-07-12T02:28:07.2251836Z Jul 12 02:28:07at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
2022-07-12T02:28:07.2253000Z Jul 12 02:28:07at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2022-07-12T02:28:07.2253985Z Jul 12 02:28:07at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
2022-07-12T02:28:07.2254718Z Jul 12 02:28:07at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2022-07-12T02:28:07.2255385Z Jul 12 02:28:07at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
2022-07-12T02:28:07.2256041Z Jul 12 02:28:07at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
2022-07-12T02:28:07.2256709Z Jul 12 02:28:07at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
2022-07-12T02:28:07.2257481Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-07-12T02:28:07.2258155Z Jul 12 02:28:07at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
2022-07-12T02:28:07.2258837Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
2022-07-12T02:28:07.2259514Z Jul 12 02:28:07at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
2022-07-12T02:28:07.2260373Z Jul 12 02:28:07at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
2022-07-12T02:28:07.2261087Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
2022-07-12T02:28:07.2261755Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
2022-07-12T02:28:07.2262398Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
2022-07-12T02:28:07.2263045Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
2022-07-12T02:28:07.2263680Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
2022-07-12T02:28:07.2264302Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
2022-07-12T02:28:07.2265018Z Jul 12 02:28:07at 
org.junit.runners.Suite.runChild(Suite.java:128)
2022-07-12T02:28:07.2265581Z Jul 12 02:28:07at 
org.junit.runners.Suite.runChild(Suite.java:27)
2022-07-12T02:28:07.2266314Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
2022-07-12T02:28:07.2266941Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
2022-07-12T02:28:07.2267694Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
2022-07-12T02:28:07.2268376Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
2022-07-12T02:28:07.2269147Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
2022-07-12T02:28:07.2269785Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-07-12T02:28:07.2270449Z Jul 12 02:28:07at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
2022-07-12T02:28:07.2271039Z Jul 

[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

2022-07-12 Thread GitBox


gaoyunhaii commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r919556536


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##
@@ -128,6 +156,15 @@ public AsyncWaitOperator(
 
 this.timeout = timeout;
 
+this.asyncRetryStrategy = asyncRetryStrategy;
+
+this.retryEnabled =
+// construct from utility class
+asyncRetryStrategy != NO_RETRY_STRATEGY
+// construct from api
+|| 
asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()

Review Comment:
   Should it be 
   
   `asyncRetryStrategy != NO_RETRY_STRATEGY` && 
(asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent() 
||asyncRetryStrategy.getRetryPredicate().exceptionPredicate().isPresent())` ?
   
   otherwise if user construct a new strategy return empty for both methods, it 
should be indeed not enabled?
   
   



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##
@@ -154,6 +190,14 @@ public void setup(
 default:
 throw new IllegalStateException("Unknown async mode: " + 
outputMode + '.');
 }
+if 
(asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {

Review Comment:
   I tend we do not leave retryResultPredicate / retryExceptionPredicate to be 
null to avoid possible errors. It could be 
   
   ```
   retryResultPredicate = 
asyncRetryStrategy.getRetryPredicate().resultPredicate()
   .orElse(ignored -> false);
   retryExceptionPredicate = 
asyncRetryStrategy.getRetryPredicate().exceptionPredicate()
   .orElse(ignored -> false);
   ```
   
   Then we could also simplify the test to 
   ```
   boolean satisfy =
   (null != results && retryResultPredicate.test(results))
   || (null != error && retryExceptionPredicate.test(error));
   ```
   
   
   



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
 }
 }
 
+/**
+ * Besides doRetry, the cleanup work will be done after retry fired, 
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry 
handlers.
+ */
+private void doRetryWithCleanup(RetryableResultHandlerDelegator 
resultHandlerDelegator)
+throws Exception {
+doRetry(resultHandlerDelegator);
+
+// reset retryInFlight for next possible retry
+resultHandlerDelegator.retryInFlight.set(false);
+// remove from incomplete retry handlers
+inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+}
+
+/** Increments number of attempts and fire the attempt. */
+private void doRetry(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+// increment current attempt number
+resultHandlerDelegator.currentAttempts++;
+
+// fire a new attempt
+userFunction.asyncInvoke(
+resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+resultHandlerDelegator);
+}
+
+/** A delegator holds the real {@link ResultHandler} to handle retries. */
+private class RetryableResultHandlerDelegator implements ResultFuture 
{
+
+private final ResultHandler resultHandler;
+private final ProcessingTimeService processingTimeService;
+
+private ScheduledFuture delayedRetryTimer;
+
+/** start from 1, when this entry created, the first attempt will 
happen. */
+private int currentAttempts = 1;
+
+/**
+ * A guard similar to ResultHandler.complete to prevent repeated 
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is 
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry 
fired.
+ */
+private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+public RetryableResultHandlerDelegator(
+StreamRecord inputRecord,
+ResultFuture resultFuture,
+ProcessingTimeService processingTimeService) {
+this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+this.processingTimeService = processingTimeService;
+}
+
+public void registerTimeout(long timeout) {
+resultHandler.registerTimeout(processingTimeService, timeout);
+}
+
+@Override
+public void complete(Collection results) {
+Preconditions.checkNotNull(
+results, "Results must not be null, use empty collection 
to emit nothing");
+if (retryEnabled
+&& 

[GitHub] [flink-kubernetes-operator] wangyang0918 merged pull request #306: [FLINK-28364] Add Python Job example using Kubernetes Operator

2022-07-12 Thread GitBox


wangyang0918 merged PR #306:
URL: https://github.com/apache/flink-kubernetes-operator/pull/306


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #20227: [FLINK-28471]Translate hive_read_write.md into Chinese

2022-07-12 Thread GitBox


luoyuxia commented on code in PR #20227:
URL: https://github.com/apache/flink/pull/20227#discussion_r919570445


##
docs/content.zh/docs/connectors/table/hive/hive_read_write.md:
##
@@ -102,124 +91,119 @@ FROM hive_table
 
 ```
 
-**Notes**
+**注意**
 
-- Monitor strategy is to scan all directories/files currently in the location 
path. Many partitions may cause performance degradation.
-- Streaming reads for non-partitioned tables requires that each file be 
written atomically into the target directory.
-- Streaming reading for partitioned tables requires that each partition should 
be added atomically in the view of hive metastore. If not, new data added to an 
existing partition will be consumed.
-- Streaming reads do not support watermark grammar in Flink DDL. These tables 
cannot be used for window operators.
+- 监控策略是扫描当前位置路径中的所有目录/文件,分区太多可能导致性能下降。
+- 流读非分区表时要求每个文件应原子地写入目标目录。
+- 流读分区表要求每个分区应该被原子地添加进 Hive metastore 中。如果不是的话,只有添加到现有分区的新数据会被消费。 
+- 流读 Hive 表不支持 Flink DDL 的 watermark 语法。这些表不能被用于窗口算子。
 
-### Reading Hive Views
+### 读取 Hive Views
 
-Flink is able to read from Hive defined views, but some limitations apply:
+Flink 能够读取 Hive 中已经定义的视图。但是也有一些限制:
 
-1) The Hive catalog must be set as the current catalog before you can query 
the view. 
-This can be done by either `tableEnv.useCatalog(...)` in Table API or `USE 
CATALOG ...` in SQL Client.
+1) Hive catalog 必须设置成当前的 catalog 才能查询视图。在 Table API 中使用 
`tableEnv.useCatalog(...)`,或者在 SQL 客户端使用 `USE CATALOG ...` 来改变当前 catalog。
 
-2) Hive and Flink SQL have different syntax, e.g. different reserved keywords 
and literals.
-Make sure the view’s query is compatible with Flink grammar.
+2) Hive 和 Flink SQL 的语法不同, 比如不同的关键字和字面值。确保查询视图与 Flink 语法兼容。

Review Comment:
   ```suggestion
   2) Hive 和 Flink SQL 的语法不同, 比如不同的关键字和字面值。请确保对视图的查询语法与 Flink 语法兼容。
   ```



##
docs/content.zh/docs/connectors/table/hive/hive_read_write.md:
##
@@ -25,74 +25,63 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Hive Read & Write
+# Hive 读 & 写
 
-Using the `HiveCatalog`, Apache Flink can be used for unified `BATCH` and 
`STREAM` processing of Apache 
-Hive Tables. This means Flink can be used as a more performant alternative to 
Hive’s batch engine,
-or to continuously read and write data into and out of Hive tables to power 
real-time data
-warehousing applications. 
+通过使用 `HiveCatalog`,Apache Flink 可以对 Apache Hive 表做统一的批和流处理。这意味着 Flink 可以成为 
Hive 批处理引擎的一个性能更好的选择,或者连续读写 Hive 表中的数据以支持实时数据仓库应用。
 
-## Reading
+## 读
 
-Flink supports reading data from Hive in both `BATCH` and `STREAMING` modes. 
When run as a `BATCH`
-application, Flink will execute its query over the state of the table at the 
point in time when the
-query is executed. `STREAMING` reads will continuously monitor the table and 
incrementally fetch
-new data as it is made available. Flink will read tables as bounded by default.
+Flink 支持以批和流两种模式从 Hive 表中读取数据。批读的时候,Flink 
会基于执行查询时表的状态进行查询。流读时将持续监控表,并在表中新数据可用时进行增量获取,默认情况下,Flink 将以批模式读取数据。
 
-`STREAMING` reads support consuming both partitioned and non-partitioned 
tables. 
-For partitioned tables, Flink will monitor the generation of new partitions, 
and read
-them incrementally when available. For non-partitioned tables, Flink will 
monitor the generation
-of new files in the folder and read new files incrementally.
+流读支持消费分区表和非分区表。对于分区表,Flink 会监控新分区的生成,并且在数据可用的情况下增量获取数据。对于非分区表,Flink 
将监控文件夹中新文件的生成,并增量地读取新文件。
 
 
   
 
-Key
-Default
-Type
-Description
+键
+默认值
+类型
+描述
 
   
   
 
 streaming-source.enable
 false
 Boolean
-Enable streaming source or not. NOTES: Please make sure that each 
partition/file should be written atomically, otherwise the reader may get 
incomplete data.
+是否启动流读。注意:请确保每个分区/文件都应该原子地写入,否则读取不到完整的数据。
 
 
 streaming-source.partition.include
 all
 String
-Option to set the partitions to read, the supported option are 
`all` and `latest`, the `all` means read all partitions; the `latest` means 
read latest partition in order of 'streaming-source.partition.order', the 
`latest` only works` when the streaming hive source table used as temporal 
table. By default the option is `all`.
-Flink supports temporal join the latest hive partition by enabling 
'streaming-source.enable' and setting 'streaming-source.partition.include' to 
'latest', at the same time, user can assign the partition compare order and 
data update interval by configuring following partition-related options.  
+选择读取的分区,可选项为 `all` 和 `latest`,`all` 读取所有分区;`latest` 读取按照 
'streaming-source.partition.order' 排序后的最新分区,`latest` 仅在流模式的 Hive 
源表作为时态表时有效。默认的选项是 `all`。在开启 'streaming-source.enable' 并设置 
'streaming-source.partition.include' 为 'latest' 时,Flink 支持 temporal join 最新的 
Hive 分区,同时,用户可以通过配置分区相关的选项来配置分区比较顺序和数据更新时间间隔。 
 
  
 

[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #306: [FLINK-28364] Add Python Job example using Kubernetes Operator

2022-07-12 Thread GitBox


wangyang0918 commented on PR #306:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/306#issuecomment-1182672242

   Merging now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] sunshineJK commented on pull request #20127: [FLINK-26270] Flink SQL write data to kafka by CSV format , whether d…

2022-07-12 Thread GitBox


sunshineJK commented on PR #20127:
URL: https://github.com/apache/flink/pull/20127#issuecomment-1182668416

   hello @MartijnVisser @afedulov @PatrickRen  , Do you have any suggestions 
for the latest submission ? Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #20239: [FLINK-27703][core] Extend timeout and add logs for FileChannelManagerImplTest

2022-07-12 Thread GitBox


gaoyunhaii commented on PR #20239:
URL: https://github.com/apache/flink/pull/20239#issuecomment-1182650857

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part

2022-07-12 Thread GitBox


rkhachatryan commented on code in PR #20217:
URL: https://github.com/apache/flink/pull/20217#discussion_r919544947


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java:
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateHandleID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+public class ChangelogStateBackendLocalHandle implements 
ChangelogStateBackendHandle {
+private static final long serialVersionUID = 1L;
+private static final Logger LOG =
+LoggerFactory.getLogger(ChangelogStateBackendLocalHandle.class);
+private final List localMaterialized;
+private final List localNonMaterialized;
+private final ChangelogStateBackendHandleImpl remoteHandle;
+
+public ChangelogStateBackendLocalHandle(
+List localMaterialized,
+List localNonMaterialized,
+ChangelogStateBackendHandleImpl remoteHandle) {
+this.localMaterialized = localMaterialized;
+this.localNonMaterialized = localNonMaterialized;
+this.remoteHandle = remoteHandle;
+}
+
+@Override
+public List getMaterializedStateHandles() {
+return localMaterialized;
+}
+
+@Override
+public List getNonMaterializedStateHandles() {
+return localNonMaterialized;
+}
+
+@Override
+public long getMaterializationID() {
+return remoteHandle.getMaterializationID();
+}
+
+@Override
+public ChangelogStateBackendHandle rebound(long checkpointId) {
+return remoteHandle.rebound(checkpointId);
+}
+
+public List getRemoteMaterializedStateHandles() {
+return remoteHandle.getMaterializedStateHandles();
+}
+
+public List getRemoteNonMaterializedStateHandles() {
+return remoteHandle.getNonMaterializedStateHandles();
+}
+
+@Override
+public long getCheckpointId() {
+return remoteHandle.getCheckpointId();
+}
+
+@Override
+public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
+remoteHandle.registerSharedStates(stateRegistry, checkpointID);
+}
+
+@Override
+public long getCheckpointedSize() {
+return remoteHandle.getCheckpointedSize();
+}
+
+@Override
+public KeyGroupRange getKeyGroupRange() {
+return remoteHandle.getKeyGroupRange();
+}
+
+@Nullable
+@Override
+public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+throw new UnsupportedOperationException(
+"This is a local state handle for the TM side only.");
+}
+
+@Override
+public StateHandleID getStateHandleId() {
+return remoteHandle.getStateHandleId();
+}
+
+@Override
+public void discardState() throws Exception {}

Review Comment:
   Probably this case is not handled: the last checkpoint(s) aborted and then 
job terminated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part

2022-07-12 Thread GitBox


rkhachatryan commented on code in PR #20217:
URL: https://github.com/apache/flink/pull/20217#discussion_r919537669


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java:
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateHandleID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+public class ChangelogStateBackendLocalHandle implements 
ChangelogStateBackendHandle {
+private static final long serialVersionUID = 1L;
+private static final Logger LOG =
+LoggerFactory.getLogger(ChangelogStateBackendLocalHandle.class);
+private final List localMaterialized;
+private final List localNonMaterialized;
+private final ChangelogStateBackendHandleImpl remoteHandle;
+
+public ChangelogStateBackendLocalHandle(
+List localMaterialized,
+List localNonMaterialized,
+ChangelogStateBackendHandleImpl remoteHandle) {
+this.localMaterialized = localMaterialized;
+this.localNonMaterialized = localNonMaterialized;
+this.remoteHandle = remoteHandle;
+}
+
+@Override
+public List getMaterializedStateHandles() {
+return localMaterialized;
+}
+
+@Override
+public List getNonMaterializedStateHandles() {
+return localNonMaterialized;
+}
+
+@Override
+public long getMaterializationID() {
+return remoteHandle.getMaterializationID();
+}
+
+@Override
+public ChangelogStateBackendHandle rebound(long checkpointId) {
+return remoteHandle.rebound(checkpointId);
+}
+
+public List getRemoteMaterializedStateHandles() {
+return remoteHandle.getMaterializedStateHandles();
+}
+
+public List getRemoteNonMaterializedStateHandles() {
+return remoteHandle.getNonMaterializedStateHandles();
+}
+
+@Override
+public long getCheckpointId() {
+return remoteHandle.getCheckpointId();
+}
+
+@Override
+public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
+remoteHandle.registerSharedStates(stateRegistry, checkpointID);
+}
+
+@Override
+public long getCheckpointedSize() {
+return remoteHandle.getCheckpointedSize();
+}
+
+@Override
+public KeyGroupRange getKeyGroupRange() {
+return remoteHandle.getKeyGroupRange();
+}
+
+@Nullable
+@Override
+public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+throw new UnsupportedOperationException(
+"This is a local state handle for the TM side only.");
+}
+
+@Override
+public StateHandleID getStateHandleId() {
+return remoteHandle.getStateHandleId();
+}
+
+@Override
+public void discardState() throws Exception {}

Review Comment:
   :+1: IIUC, discard logic is correct:
   1. `(Changelog)TaskLocalStateStore` calls this no-op method
   1. `LocalStateRegistry` calls `discardState()` on "low-level" handle 
(`FileStateHandle`) - on checkpoint confirmation
   1. `TaskChangelogRegistry` - calls `discardState()` on "low-level" handle 
(`FileStateHandle`) - when upload is no more needed
   1. As opposed to "normal" local state, local changelog state is *not* 
discarded on recovery (because of 1); but it will be discarded on the 1st 
checkpoint confirmation
   
   Can you confirm that?
   
   Although I found it tricky to understand, so it makes sense to document it 
somewhere, WDYT?
   
   (and also to cover with tests).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part

2022-07-12 Thread GitBox


rkhachatryan commented on code in PR #20217:
URL: https://github.com/apache/flink/pull/20217#discussion_r919521941


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A DuplicatingOutputStreamWithPos is similar to {@link 
DuplicatingCheckpointOutputStream} which
+ * wraps a primary and a secondary OutputStream and duplicates all writes into 
both streams. The
+ * difference is that this stream does not delete the file when {@link 
#close()}.
+ */
+class DuplicatingOutputStreamWithPos extends OutputStreamWithPos {
+private static final Logger LOG = 
LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class);
+
+private final OutputStream secondaryStream;
+
+public DuplicatingOutputStreamWithPos(OutputStream primaryStream, 
OutputStream secondaryStream)
+throws IOException {
+super(primaryStream);
+this.secondaryStream = Preconditions.checkNotNull(secondaryStream);
+}
+
+@Override
+public void write(int b) throws IOException {
+outputStream.write(b);
+try {
+secondaryStream.write(b);
+} catch (Exception ex) {
+LOG.warn("Exception encountered during write to secondary stream");
+}

Review Comment:
   if this class remains: I think such error handling might result in data loss 
on recovery.
   
   ditt: other methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part

2022-07-12 Thread GitBox


rkhachatryan commented on code in PR #20217:
URL: https://github.com/apache/flink/pull/20217#discussion_r919521374


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A DuplicatingOutputStreamWithPos is similar to {@link 
DuplicatingCheckpointOutputStream} which
+ * wraps a primary and a secondary OutputStream and duplicates all writes into 
both streams. The
+ * difference is that this stream does not delete the file when {@link 
#close()}.
+ */
+class DuplicatingOutputStreamWithPos extends OutputStreamWithPos {

Review Comment:
   Is this class necessary? Can't the existing 
`DuplicatingCheckpointOutputStream` be used instead?



##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A DuplicatingOutputStreamWithPos is similar to {@link 
DuplicatingCheckpointOutputStream} which
+ * wraps a primary and a secondary OutputStream and duplicates all writes into 
both streams. The
+ * difference is that this stream does not delete the file when {@link 
#close()}.
+ */
+class DuplicatingOutputStreamWithPos extends OutputStreamWithPos {
+private static final Logger LOG = 
LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class);
+
+private final OutputStream secondaryStream;
+
+public DuplicatingOutputStreamWithPos(OutputStream primaryStream, 
OutputStream secondaryStream)
+throws IOException {
+super(primaryStream);
+this.secondaryStream = Preconditions.checkNotNull(secondaryStream);
+}
+
+@Override
+public void write(int b) throws IOException {
+outputStream.write(b);
+try {
+secondaryStream.write(b);
+} catch (Exception ex) {
+LOG.warn("Exception encountered during write to secondary stream");
+}

Review Comment:
   if this class remains:  think such error handling might result in data loss 
on recovery.
   
   ditt: other methods



##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java:
##
@@ -19,50 +19,30 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import 

[GitHub] [flink-kubernetes-operator] tedhtchang commented on pull request #313: [FLINK-27852][docs] OLM installation and development documentation

2022-07-12 Thread GitBox


tedhtchang commented on PR #313:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/313#issuecomment-1182614400

   @mbalassi Done!. Also added some info for webhook certificate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #20258: [FLINK-28522][tests][JUnit5 migration] flink-sequence-file

2022-07-12 Thread GitBox


flinkbot commented on PR #20258:
URL: https://github.com/apache/flink/pull/20258#issuecomment-1182611408

   
   ## CI report:
   
   * 455af0ef4247c0f4fc6751c16fc483750ad703de UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28522) [JUnit5 Migration] Module: flink-sequence-file

2022-07-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28522:
---
Labels: pull-request-available  (was: )

> [JUnit5 Migration] Module: flink-sequence-file
> --
>
> Key: FLINK-28522
> URL: https://issues.apache.org/jira/browse/FLINK-28522
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] RyanSkraba opened a new pull request, #20258: [FLINK-28522][tests][JUnit5 migration] flink-sequence-file

2022-07-12 Thread GitBox


RyanSkraba opened a new pull request, #20258:
URL: https://github.com/apache/flink/pull/20258

   
   
   ## What is the purpose of the change
   
   Update the `flink-formats/flink-sequence-file` module to AssertJ and JUnit 5 
following the [JUnit 5 Migration 
Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit)
   
   ## Brief change log
   
   * Removed dependences on JUnit 4, JUnit 5 Assertions and Hamcrest where 
possible.
   
   ## Verifying this change
   
   This change is a code cleanup without any test coverage.
   
   I verified that there were 4 tests run before and after the change.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive):no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-28522) [JUnit5 Migration] Module: flink-sequence-file

2022-07-12 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-28522:
---

 Summary: [JUnit5 Migration] Module: flink-sequence-file
 Key: FLINK-28522
 URL: https://issues.apache.org/jira/browse/FLINK-28522
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Ryan Skraba






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28522) [JUnit5 Migration] Module: flink-sequence-file

2022-07-12 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566073#comment-17566073
 ] 

Ryan Skraba commented on FLINK-28522:
-

Can this be assigned to me please?

> [JUnit5 Migration] Module: flink-sequence-file
> --
>
> Key: FLINK-28522
> URL: https://issues.apache.org/jira/browse/FLINK-28522
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Ryan Skraba
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27918) SavepointITCase.testStopWithSavepointFailingAfterSnapshotCreation failed with Expected RuntimeException after snapshot creation

2022-07-12 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27918:
---
Labels: stale-critical test-stability  (was: test-stability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Critical but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 14 days. I have gone ahead and marked it "stale-critical". If this 
ticket is critical, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.


> SavepointITCase.testStopWithSavepointFailingAfterSnapshotCreation failed with 
> Expected RuntimeException after snapshot creation
> ---
>
> Key: FLINK-27918
> URL: https://issues.apache.org/jira/browse/FLINK-27918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Critical
>  Labels: stale-critical, test-stability
>
> {code:java}
> 2022-06-06T03:13:54.0165829Z Jun 06 03:13:54 [ERROR] 
> org.apache.flink.test.checkpointing.SavepointITCase.testStopWithSavepointFailingAfterSnapshotCreation
>   Time elapsed: 0.242 s  <<< ERROR!
> 2022-06-06T03:13:54.0167256Z Jun 06 03:13:54 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.util.FlinkException: Stop with savepoint operation could not 
> be completed.
> 2022-06-06T03:13:54.0173825Z Jun 06 03:13:54  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2022-06-06T03:13:54.0174662Z Jun 06 03:13:54  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2022-06-06T03:13:54.0180645Z Jun 06 03:13:54  at 
> org.apache.flink.test.checkpointing.SavepointITCase.testStopWithFailingSourceInOnePipeline(SavepointITCase.java:1175)
> 2022-06-06T03:13:54.0181702Z Jun 06 03:13:54  at 
> org.apache.flink.test.checkpointing.SavepointITCase.testStopWithSavepointFailingAfterSnapshotCreation(SavepointITCase.java:1020)
> 2022-06-06T03:13:54.0182472Z Jun 06 03:13:54  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-06-06T03:13:54.0184012Z Jun 06 03:13:54  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-06-06T03:13:54.0185109Z Jun 06 03:13:54  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-06-06T03:13:54.0185907Z Jun 06 03:13:54  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-06-06T03:13:54.0187049Z Jun 06 03:13:54  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-06-06T03:13:54.0188081Z Jun 06 03:13:54  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-06-06T03:13:54.0189241Z Jun 06 03:13:54  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-06-06T03:13:54.0190002Z Jun 06 03:13:54  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-06-06T03:13:54.0190704Z Jun 06 03:13:54  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-06-06T03:13:54.0191400Z Jun 06 03:13:54  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-06-06T03:13:54.0192051Z Jun 06 03:13:54  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-06-06T03:13:54.0192883Z Jun 06 03:13:54  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-06-06T03:13:54.0194154Z Jun 06 03:13:54  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-06-06T03:13:54.0195096Z Jun 06 03:13:54  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-06-06T03:13:54.0196226Z Jun 06 03:13:54  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-06-06T03:13:54.0197088Z Jun 06 03:13:54  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-06-06T03:13:54.0198037Z Jun 06 03:13:54  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-06-06T03:13:54.0199186Z Jun 06 03:13:54  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-06-06T03:13:54.0200147Z Jun 06 03:13:54  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-06-06T03:13:54.0200956Z Jun 06 03:13:54  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-06-06T03:13:54.0201666Z Jun 06 03:13:54  at 
> 

[jira] [Updated] (FLINK-26907) RMQSourceITCase failed on azure due to ContainerLaunchException: Container startup failed

2022-07-12 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-26907:
---
  Labels: auto-deprioritized-major test-stability  (was: stale-major 
test-stability)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> RMQSourceITCase failed on azure due to ContainerLaunchException: Container 
> startup failed
> -
>
> Key: FLINK-26907
> URL: https://issues.apache.org/jira/browse/FLINK-26907
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Connectors/ RabbitMQ
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> 2-03-28T09:41:01.3374229Z Mar 28 09:41:01 [ERROR] Tests run: 1, Failures: 0, 
> Errors: 1, Skipped: 0, Time elapsed: 91.834 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase
> 2022-03-28T09:41:01.3375722Z Mar 28 09:41:01 [ERROR] 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase  Time elapsed: 
> 91.834 s  <<< ERROR!
> 2022-03-28T09:41:01.3376743Z Mar 28 09:41:01 
> org.testcontainers.containers.ContainerLaunchException: Container startup 
> failed
> 2022-03-28T09:41:01.3378470Z Mar 28 09:41:01  at 
> org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336)
> 2022-03-28T09:41:01.3379355Z Mar 28 09:41:01  at 
> org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317)
> 2022-03-28T09:41:01.3380117Z Mar 28 09:41:01  at 
> org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066)
> 2022-03-28T09:41:01.3381076Z Mar 28 09:41:01  at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> 2022-03-28T09:41:01.3382198Z Mar 28 09:41:01  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2022-03-28T09:41:01.3383575Z Mar 28 09:41:01  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-03-28T09:41:01.3384717Z Mar 28 09:41:01  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 2022-03-28T09:41:01.3385671Z Mar 28 09:41:01  at 
> org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 2022-03-28T09:41:01.3386611Z Mar 28 09:41:01  at 
> org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> 2022-03-28T09:41:01.3387691Z Mar 28 09:41:01  at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
> 2022-03-28T09:41:01.3388981Z Mar 28 09:41:01  at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
> 2022-03-28T09:41:01.3390250Z Mar 28 09:41:01  at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> 2022-03-28T09:41:01.3391619Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
> 2022-03-28T09:41:01.3393437Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
> 2022-03-28T09:41:01.3394826Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
> 2022-03-28T09:41:01.3396333Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
> 2022-03-28T09:41:01.3397800Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
> 2022-03-28T09:41:01.3399166Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
> 2022-03-28T09:41:01.3400315Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
> 2022-03-28T09:41:01.3401636Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
> 2022-03-28T09:41:01.3403403Z Mar 28 09:41:01  at 
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
> 2022-03-28T09:41:01.3404823Z Mar 28 09:41:01  at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
> 2022-03-28T09:41:01.3406517Z Mar 28 09:41:01  at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
> 2022-03-28T09:41:01.3407936Z Mar 28 09:41:01  at 
> 

[jira] [Updated] (FLINK-26772) Application and Job Mode does not wait for job cleanup during shutdown

2022-07-12 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-26772:
---
  Labels: auto-deprioritized-critical pull-request-available  (was: 
pull-request-available stale-critical)
Priority: Major  (was: Critical)

This issue was labeled "stale-critical" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Critical, 
please raise the priority and ask a committer to assign you the issue or revive 
the public discussion.


> Application and Job Mode does not wait for job cleanup during shutdown
> --
>
> Key: FLINK-26772
> URL: https://issues.apache.org/jira/browse/FLINK-26772
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0
>Reporter: Mika Naylor
>Priority: Major
>  Labels: auto-deprioritized-critical, pull-request-available
> Attachments: FLINK-26772.standalone-job.log, 
> testcluster-599f4d476b-bghw5_log.txt
>
>
> We discovered that in Application Mode, when the application has completed, 
> the cluster is shutdown even if there are ongoing resource cleanup events 
> happening in the background. For example, if ha cleanup fails, further 
> retries are not attempted as the cluster is shut down before this can happen.
>  
> We should also add a flag for the shutdown that will prevent further jobs 
> from being submitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-15123) remove uniqueKeys from FlinkStatistic in blink planner

2022-07-12 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-15123:
---
Labels: stale-major starter  (was: starter)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> remove uniqueKeys from FlinkStatistic in blink planner 
> ---
>
> Key: FLINK-15123
> URL: https://issues.apache.org/jira/browse/FLINK-15123
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Priority: Major
>  Labels: stale-major, starter
> Attachments: b_5.txt
>
>
> {{uniqueKeys}} is a kind of constraint, it's unreasonable that {{uniqueKeys}} 
> is a kind of statistic. so we should remove uniqueKeys from 
> {{FlinkStatistic}} in blink planner. Some temporary solutions (e.g. 
> {{RichTableSourceQueryOperation}}) should also be resolved after primaryKey 
> is introduced in {{TableSchema}} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   >