[jira] [Commented] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2

2022-10-20 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621521#comment-17621521
 ] 

luoyuxia commented on FLINK-29712:
--

[~macdoor615] Which mode do you run Flink? Application, Session mode or others?

> The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 
> 1.16.0-rc2
> 
>
> Key: FLINK-29712
> URL: https://issues.apache.org/jira/browse/FLINK-29712
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
> Environment: Flink 1.16.0-rc2
> Hive 3.1.3
> Hadoop 3.3.4
>Reporter: macdoor615
>Priority: Blocker
> Fix For: 1.16.0
>
> Attachments: flink-conf.yaml
>
>
> All my batch jobs have failed with same error. All streaming jobs work fine.
> {code:java}
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
> backoffTimeMS=6)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299)
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator 
> 6cdc5bb954874d922eaee11a8e7b5dd5).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java

[jira] [Commented] (FLINK-29328) 【Flink is having problems using the status expiration setting】

2022-10-20 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621520#comment-17621520
 ] 

Yun Tang commented on FLINK-29328:
--

First of all, the problem would only occur during state restoring. The state 
descriptors (quantityJudgeStateDescriptor and rateAlgorithmStateProperties) you 
offered is totally different, and they must meet the migration problem due to 
incompatible serializers.

> 【Flink is having problems using the status expiration setting】
> --
>
> Key: FLINK-29328
> URL: https://issues.apache.org/jira/browse/FLINK-29328
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.3
> Environment: !报错1.jpg!!报错2.jpg!
>Reporter: Jason
>Priority: Minor
> Attachments: 报错1.jpg, 报错2.jpg
>
>
> I am flink1.14.3 based version, the following problems when using the finish 
> a Flink homework for the first time, add the TTL Settings, and then start the 
> homework, automatic recovery at a particular time homework problems, the 
> following error, specific see attachment pictures, eventually repair method 
> is, in creating a state descriptor is to change the wording, As follows:
>  * Before the error:
> {code:java}
> public static final MapStateDescriptor 
> quantityJudgeStateDescriptor = new MapStateDescriptor<>(
> "quantityJudgeMapState",
> String.class,
> Integer.class); {code}
>  * After the error is reported:
> {code:java}
> public static final MapStateDescriptor 
> rateAlgorithmStateProperties = new MapStateDescriptor<>(
> "rateAlgorithmMapState",
> TypeInformation.of(new TypeHint() {
> }),
> TypeInformation.of(new TypeHint() {
> })
> ); {code}
> After changing this way of writing, the test did not appear the above 
> problem, do not know whether it is a bug problem, raise this problem, in 
> order to trace the source.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fredia commented on pull request #20965: [FLINK-29244][state/changelog] Add metric lastMaterializationDuration…

2022-10-20 Thread GitBox


fredia commented on PR #20965:
URL: https://github.com/apache/flink/pull/20965#issuecomment-1286536164

   Hi @zoltar9264, I think it's a little confusing to mix `successed` and 
`failed` together. In fact, the failed materialization duration is meaningless, 
I prefer to keep `lastDurationOfMaterialization` and 
`lastIncSizeOfMaterialization`/`lastFullSizeOfMaterialization` consistent. What 
do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29700) Serializer to BinaryInMemorySortBuffer is wrong

2022-10-20 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-29700.

Resolution: Fixed

release-0.2: aa6846089afc61e6715a7d50ae40e6bb9d8efc0f

> Serializer to BinaryInMemorySortBuffer is wrong
> ---
>
> Key: FLINK-29700
> URL: https://issues.apache.org/jira/browse/FLINK-29700
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.2
>
>
> In SortBufferMemTable, it will use 
> `BinaryInMemorySortBuffer.createBuffer(BinaryRowDataSerializer serializer)`, 
> the serializer is for full row, not just sort key fields.
> Problems may occur when there are many fields.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #325: [FLINK-29700] Serializer to BinaryInMemorySortBuffer is wrong

2022-10-20 Thread GitBox


JingsongLi merged PR #325:
URL: https://github.com/apache/flink-table-store/pull/325


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-20 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621510#comment-17621510
 ] 

Yun Tang commented on FLINK-29577:
--

[~cailiuyang] I'm not sure whether the fast disk performance could help 
mitigate the unnecessary storage impact, especially considering write-ahead-log 
is flushed sequentially.

If we can really avoid to generate the write-ahead-log, I think this PR 
deserved to be merged (it should have a unit test to prove the write options 
has disabled the WAL).

> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-20-16-08-15-746.png, 
> image-2022-10-20-16-11-04-211.png
>
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and maybe affect rocksdb-write-performance when 
> restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
> try (RocksDBWriteBatchWrapper writeBatchWrapper =
> new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
> writeBatchSize)) {
> HeapPriorityQueueSnapshotRestoreWrapper 
> restoredPQ = null;
> ColumnFamilyHandle handle = null;
>..
> }
> // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
> explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
> public void flush() throws RocksDBException {
> if (options != null) {
> db.write(options, batch);
> } else {
> // use the default WriteOptions, if wasn't provided.
> try (WriteOptions writeOptions = new WriteOptions()) {
> db.write(writeOptions, batch);
> }
> }
> batch.clear();
> }
> {code}
>  
>  
> As we known, rocksdb's wal is usesless for flink, so i think we can disable 
> wal for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2

2022-10-20 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621506#comment-17621506
 ] 

Weijie Guo commented on FLINK-29712:


cc [~luoyuxia]

> The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 
> 1.16.0-rc2
> 
>
> Key: FLINK-29712
> URL: https://issues.apache.org/jira/browse/FLINK-29712
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
> Environment: Flink 1.16.0-rc2
> Hive 3.1.3
> Hadoop 3.3.4
>Reporter: macdoor615
>Priority: Blocker
> Fix For: 1.16.0
>
> Attachments: flink-conf.yaml
>
>
> All my batch jobs have failed with same error. All streaming jobs work fine.
> {code:java}
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
> backoffTimeMS=6)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299)
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator 
> 6cdc5bb954874d922eaee11a8e7b5dd5).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:198)
>     at 
> org.apache.flink.runtime.scheduler.DefaultO

[jira] [Updated] (FLINK-29702) Add merge tree reader and writer micro benchmarks

2022-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29702:
---
Labels: pull-request-available  (was: )

> Add merge tree reader and writer micro benchmarks
> -
>
> Key: FLINK-29702
> URL: https://issues.apache.org/jira/browse/FLINK-29702
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.3.0, table-store-0.2.2
>Reporter: Shammon
>Priority: Major
>  Labels: pull-request-available
>
> Add merge tree reader and writer micro benchmarks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] zjureel opened a new pull request, #326: [FLINK-29702] Add micro benchmarks module and merge tree reader/writer benchmarks

2022-10-20 Thread GitBox


zjureel opened a new pull request, #326:
URL: https://github.com/apache/flink-table-store/pull/326

   In this PR we create `flink-table-store-micro-benchmarks` module in 
`flink-table-store-benchmark`, and add merge tree reader/writer benchmarks:
   
   1.  In `MergeTreeReaderBenchmark` we first create a writer to write about 
50*5 records to the store, and performance the latency of scan in reader. 
   2. In `MergeTreeWriterBenchmark` we performance the throughput of write in 
merge tree with compaction


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on pull request #325: [FLINK-29700] Serializer to BinaryInMemorySortBuffer is wrong

2022-10-20 Thread GitBox


JingsongLi commented on PR #325:
URL: 
https://github.com/apache/flink-table-store/pull/325#issuecomment-1286488493

   Just cherry-pick from #315


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29700) Serializer to BinaryInMemorySortBuffer is wrong

2022-10-20 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-29700:
-
Fix Version/s: (was: table-store-0.3.0)

> Serializer to BinaryInMemorySortBuffer is wrong
> ---
>
> Key: FLINK-29700
> URL: https://issues.apache.org/jira/browse/FLINK-29700
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.2
>
>
> In SortBufferMemTable, it will use 
> `BinaryInMemorySortBuffer.createBuffer(BinaryRowDataSerializer serializer)`, 
> the serializer is for full row, not just sort key fields.
> Problems may occur when there are many fields.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29700) Serializer to BinaryInMemorySortBuffer is wrong

2022-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29700:
---
Labels: pull-request-available  (was: )

> Serializer to BinaryInMemorySortBuffer is wrong
> ---
>
> Key: FLINK-29700
> URL: https://issues.apache.org/jira/browse/FLINK-29700
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0, table-store-0.2.2
>
>
> In SortBufferMemTable, it will use 
> `BinaryInMemorySortBuffer.createBuffer(BinaryRowDataSerializer serializer)`, 
> the serializer is for full row, not just sort key fields.
> Problems may occur when there are many fields.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi opened a new pull request, #325: [FLINK-29700] Serializer to BinaryInMemorySortBuffer is wrong

2022-10-20 Thread GitBox


JingsongLi opened a new pull request, #325:
URL: https://github.com/apache/flink-table-store/pull/325

   In SortBufferMemTable, it will use 
`BinaryInMemorySortBuffer.createBuffer(BinaryRowDataSerializer serializer)`, 
the serializer is for full row, not just sort key fields.
   Problems may occur when there are many fields.
   
   The test does not reproduce the problem because special conditions are 
required to trigger the problem.
   This problem is completely avoided at the code parameter level.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2

2022-10-20 Thread macdoor615 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

macdoor615 updated FLINK-29712:
---
Description: 
All my batch jobs have failed with same error. All streaming jobs work fine.
{code:java}
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
backoffTimeMS=6)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299)
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
    at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator 
6cdc5bb954874d922eaee11a8e7b5dd5).
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:198)
    at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165)
    at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:605)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1046)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:963)
    at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:422)
    at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622)
   

[jira] [Commented] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2

2022-10-20 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621486#comment-17621486
 ] 

macdoor615 commented on FLINK-29712:


job parameters 

 
{code:java}
SET table.dml-sync = true;
SET execution.runtime-mode = batch;
SET parallelism.default = -1;
SET table.exec.hive.infer-source-parallelism = true;
SET table.exec.hive.infer-source-parallelism.max = 16;
SET taskmanager.network.memory.buffers-per-channel = 0;
SET jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task = 4kb;
SET table.dynamic-table-options.enabled = true;
SET taskmanager.network.memory.buffer-debloat.enabled = true;
SET table.exec.legacy-cast-behaviour=enabled;
{code}
 

> The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 
> 1.16.0-rc2
> 
>
> Key: FLINK-29712
> URL: https://issues.apache.org/jira/browse/FLINK-29712
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
> Environment: Flink 1.16.0-rc2
> Hive 3.1.3
> Hadoop 3.3.4
>Reporter: macdoor615
>Priority: Blocker
> Fix For: 1.16.0
>
> Attachments: flink-conf.yaml
>
>
> All my tasks have failed with same error 
> {code:java}
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
> backoffTimeMS=6)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299)
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator 
> 6cdc5bb954874d922eaee11a8e7b5dd5).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217)
>     at 
> org.apache.fl

[jira] [Updated] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2

2022-10-20 Thread macdoor615 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

macdoor615 updated FLINK-29712:
---
Description: 
All my tasks have failed with same error 
{code:java}
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
backoffTimeMS=6)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299)
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
    at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator 
6cdc5bb954874d922eaee11a8e7b5dd5).
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:198)
    at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165)
    at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:605)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1046)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:963)
    at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:422)
    at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622)
    at 
org.apache.flink.runtime.concu

[jira] [Updated] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2

2022-10-20 Thread macdoor615 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

macdoor615 updated FLINK-29712:
---
Attachment: flink-conf.yaml

> The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 
> 1.16.0-rc2
> 
>
> Key: FLINK-29712
> URL: https://issues.apache.org/jira/browse/FLINK-29712
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
> Environment: Flink 1.16.0-rc2
> Hive 3.1.3
> Hadoop 3.3.4
>Reporter: macdoor615
>Priority: Blocker
> Fix For: 1.16.0
>
> Attachments: flink-conf.yaml
>
>
> All my tasks have failed with same error **
>  
> {code:java}
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
> backoffTimeMS=6)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299)
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator 
> 6cdc5bb954874d922eaee11a8e7b5dd5).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:198)
>     at 
> org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoo

[jira] [Created] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2

2022-10-20 Thread macdoor615 (Jira)
macdoor615 created FLINK-29712:
--

 Summary: The same batch task works fine in 1.15.2 and 1.16.0-rc1, 
but fails in 1.16.0-rc2
 Key: FLINK-29712
 URL: https://issues.apache.org/jira/browse/FLINK-29712
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.16.0
 Environment: Flink 1.16.0-rc2

Hive 3.1.3

Hadoop 3.3.4
Reporter: macdoor615
 Fix For: 1.16.0


All my tasks have failed with same error **

 
{code:java}
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
backoffTimeMS=6)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299)
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
    at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator 
6cdc5bb954874d922eaee11a8e7b5dd5).
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:198)
    at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165)
    at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:605)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1046)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.j

[GitHub] [flink] lindong28 commented on a diff in pull request #20454: [FLINK-28639][Runtime/Checkpointing] Preserve consistency of events from subtask to OC

2022-10-20 Thread GitBox


lindong28 commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r1001273247


##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##
@@ -207,25 +241,43 @@ public void close() throws Exception {
 public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event)
 throws Exception {
 mainThreadExecutor.assertRunningInMainThread();
-if (event instanceof AcknowledgeCheckpointEvent) {
+
+if (event instanceof AcknowledgeCloseGatewayEvent) {
+Preconditions.checkArgument(
+subtask == ((AcknowledgeCloseGatewayEvent) 
event).getSubtaskIndex());
+completeAcknowledgeCloseGatewayFuture(
+((AcknowledgeCloseGatewayEvent) event).getCheckpointID(), 
subtask);
+return;
+} else if (event instanceof OpenGatewayEvent) {
+Preconditions.checkArgument(subtask == ((OpenGatewayEvent) 
event).getSubtaskIndex());
 subtaskGatewayMap
 .get(subtask)
-.openGatewayAndUnmarkCheckpoint(
-((AcknowledgeCheckpointEvent) 
event).getCheckpointID());
+.openGatewayAndUnmarkCheckpoint(((OpenGatewayEvent) 
event).getCheckpointID());
 return;
 }
+
 coordinator.handleEventFromOperator(subtask, attemptNumber, event);
 }
 
 public void executionAttemptFailed(int subtask, int attemptNumber, 
@Nullable Throwable reason) {
 mainThreadExecutor.assertRunningInMainThread();
+
+if (!context.isConcurrentExecutionAttemptsSupported()) {
+abortAcknowledgeCloseGatewayFutures(
+x -> x.f1 == subtask, String.format("Subtask %d has 
failed.", subtask), reason);
+}
+
 coordinator.executionAttemptFailed(subtask, attemptNumber, reason);
 }
 
 @Override
 public void subtaskReset(int subtask, long checkpointId) {
 mainThreadExecutor.assertRunningInMainThread();
 
+if (!context.isConcurrentExecutionAttemptsSupported()) {
+checkNoSuchAcknowledgeCloseGatewayFutures(x -> x.f1 == subtask);

Review Comment:
   Is it possible that `subtaskReset(...)` is invoked after the coordinator 
sends the `closeGatewayEvent` but before the subtask sends the 
`openGatewayEvent`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OpenGatewayEvent.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.util.Objects;
+
+/**
+ * An {@link OperatorEvent} sent between coordinators and subtasks to notify 
the other size that the

Review Comment:
   size -> side
   
   Can you update the PR description to make it clear when/who will send this 
event?
   
   Can you also update the JIRA description to provide information around 
when/who/which events are sent in this design?



##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##
@@ -397,12 +580,19 @@ public void abortCurrentTriggering() {
 // we can remove the delegation once the checkpoint coordinator runs 
fully in the
 // scheduler's main thread executor
 mainThreadExecutor.execute(
-() ->
-subtaskGatewayMap
-.values()
-.forEach(
-SubtaskGatewayImpl
-
::openGatewayAndUnmarkLastCheckpointIfAny));
+() -> {
+abortAcknowledgeCloseGatewayFutures(

Review Comment:
   Just to double check, this method may be invoked without 
`notifyCheckpointAborted()` being invoked for the 
`latestAttemptedCheckpointId`, right?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##
@@ -75,6 +103,36 @@ void dispatchEventToHandlers(
 }
 }
 
+void initializ

[jira] [Commented] (FLINK-29557) The SinkOperator's OutputFormat function is not recognized

2022-10-20 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621475#comment-17621475
 ] 

Aitozi commented on FLINK-29557:


ping [~gaoyunhaii] 

> The SinkOperator's OutputFormat function is not recognized
> --
>
> Key: FLINK-29557
> URL: https://issues.apache.org/jira/browse/FLINK-29557
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Table SQL / API
>Reporter: Aitozi
>Priority: Major
>
> In the {{SimpleOperatorFactory#of}}, only {{StreamSink}} is handled to 
> register as {{SimpleOutputFormatOperatorFactory}}. So it will lost the output 
> format information in  {{SinkOperator}}. Then some hook functions like 
> {{FinalizeOnMaster}} will have no chance to be executed.
> Due to the {{SinkOperator}} is in the table module, it can't be reached 
> directly in the {{flink-streaming-java}}. So maybe we need introduce an extra 
> common class eg: {{SinkFunctionOperator}} to describe the {{Sink}} operator 
> and handle it individually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29703) Fail to call unix_timestamp in runtime in Hive dialect

2022-10-20 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-29703.
--
Resolution: Fixed

> Fail to call unix_timestamp  in runtime in Hive dialect 
> 
>
> Key: FLINK-29703
> URL: https://issues.apache.org/jira/browse/FLINK-29703
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
>
> Can be reproduced by the following sql with Hive dialect:
> {code:java}
> select unix_timestamp();{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29703) Fail to call unix_timestamp in runtime in Hive dialect

2022-10-20 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-29703:
-
Description: 
Can be reproduced by the following sql with Hive dialect:
{code:java}
select unix_timestamp();{code}

> Fail to call unix_timestamp  in runtime in Hive dialect 
> 
>
> Key: FLINK-29703
> URL: https://issues.apache.org/jira/browse/FLINK-29703
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
>
> Can be reproduced by the following sql with Hive dialect:
> {code:java}
> select unix_timestamp();{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28766) UnalignedCheckpointStressITCase.runStressTest failed with NoSuchFileException

2022-10-20 Thread Xingbo Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621470#comment-17621470
 ] 

Xingbo Huang commented on FLINK-28766:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42272&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=10622

> UnalignedCheckpointStressITCase.runStressTest failed with NoSuchFileException
> -
>
> Key: FLINK-28766
> URL: https://issues.apache.org/jira/browse/FLINK-28766
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Anton Kalashnikov
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-08-01T01:36:16.0563880Z Aug 01 01:36:16 [ERROR] 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.runStressTest
>   Time elapsed: 12.579 s  <<< ERROR!
> 2022-08-01T01:36:16.0565407Z Aug 01 01:36:16 java.io.UncheckedIOException: 
> java.nio.file.NoSuchFileException: 
> /tmp/junit1058240190382532303/f0f99754a53d2c4633fed75011da58dd/chk-7/61092e4a-5b9a-4f56-83f7-d9960c53ed3e
> 2022-08-01T01:36:16.0566296Z Aug 01 01:36:16  at 
> java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88)
> 2022-08-01T01:36:16.0566972Z Aug 01 01:36:16  at 
> java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104)
> 2022-08-01T01:36:16.0567600Z Aug 01 01:36:16  at 
> java.util.Iterator.forEachRemaining(Iterator.java:115)
> 2022-08-01T01:36:16.0568290Z Aug 01 01:36:16  at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> 2022-08-01T01:36:16.0569172Z Aug 01 01:36:16  at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 2022-08-01T01:36:16.0569877Z Aug 01 01:36:16  at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 2022-08-01T01:36:16.0570554Z Aug 01 01:36:16  at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> 2022-08-01T01:36:16.0571371Z Aug 01 01:36:16  at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 2022-08-01T01:36:16.0572417Z Aug 01 01:36:16  at 
> java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:546)
> 2022-08-01T01:36:16.0573618Z Aug 01 01:36:16  at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.discoverRetainedCheckpoint(UnalignedCheckpointStressITCase.java:289)
> 2022-08-01T01:36:16.0575187Z Aug 01 01:36:16  at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.runAndTakeExternalCheckpoint(UnalignedCheckpointStressITCase.java:262)
> 2022-08-01T01:36:16.0576540Z Aug 01 01:36:16  at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.runStressTest(UnalignedCheckpointStressITCase.java:158)
> 2022-08-01T01:36:16.0577684Z Aug 01 01:36:16  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-08-01T01:36:16.0578546Z Aug 01 01:36:16  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-08-01T01:36:16.0579374Z Aug 01 01:36:16  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-08-01T01:36:16.0580298Z Aug 01 01:36:16  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-08-01T01:36:16.0581243Z Aug 01 01:36:16  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-08-01T01:36:16.0582029Z Aug 01 01:36:16  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-08-01T01:36:16.0582766Z Aug 01 01:36:16  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-08-01T01:36:16.0583488Z Aug 01 01:36:16  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-08-01T01:36:16.0584203Z Aug 01 01:36:16  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-08-01T01:36:16.0585087Z Aug 01 01:36:16  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-08-01T01:36:16.0585778Z Aug 01 01:36:16  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-08-01T01:36:16.0586482Z Aug 01 01:36:16  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-08-01T01:36:16.0587155Z Aug 01 01:36:16  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-08-01T01:36:16.0587809Z Aug 01 01:36:16  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-08-01T01:36:16.0588434Z Aug 01 01:36:16  at 
> org.junit.runners.ParentRunner$

[jira] [Comment Edited] (FLINK-24119) KafkaITCase.testTimestamps fails due to "Topic xxx already exist"

2022-10-20 Thread Xingbo Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621469#comment-17621469
 ] 

Xingbo Huang edited comment on FLINK-24119 at 10/21/22 3:39 AM:


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42266&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]
{code:java}
2022-10-20T08:43:50.6008823Z Oct 20 08:43:50 [ERROR] 
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryEventTime
  Time elapsed: 92.728 s  <<< FAILURE!
2022-10-20T08:43:50.6010487Z Oct 20 08:43:50 java.lang.AssertionError: Create 
test topic : partition_failure_recovery_EventTime failed, 
org.apache.kafka.common.errors.TopicExistsException: Topic 
'partition_failure_recovery_EventTime' already exists.
2022-10-20T08:43:50.6011552Z Oct 20 08:43:50at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:207)
2022-10-20T08:43:50.6012448Z Oct 20 08:43:50at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:97)
2022-10-20T08:43:50.6013274Z Oct 20 08:43:50at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:217)
2022-10-20T08:43:50.6014297Z Oct 20 08:43:50at 
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecovery(KafkaShuffleExactlyOnceITCase.java:158)
2022-10-20T08:43:50.6015529Z Oct 20 08:43:50at 
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryEventTime(KafkaShuffleExactlyOnceITCase.java:101)
 {code}


was (Author: hxb):
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42266&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203

> KafkaITCase.testTimestamps fails due to "Topic xxx already exist"
> -
>
> Key: FLINK-24119
> URL: https://issues.apache.org/jira/browse/FLINK-24119
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.15.0, 1.16.0
>Reporter: Xintong Song
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: auto-deprioritized-critical, test-stability
> Fix For: 1.16.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23328&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=7419
> {code}
> Sep 01 15:53:20 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 162.65 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Sep 01 15:53:20 [ERROR] testTimestamps  Time elapsed: 23.237 s  <<< FAILURE!
> Sep 01 15:53:20 java.lang.AssertionError: Create test topic : tstopic failed, 
> org.apache.kafka.common.errors.TopicExistsException: Topic 'tstopic' already 
> exists.
> Sep 01 15:53:20   at org.junit.Assert.fail(Assert.java:89)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testTimestamps(KafkaITCase.java:191)
> Sep 01 15:53:20   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Sep 01 15:53:20   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Sep 01 15:53:20   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Sep 01 15:53:20   at java.lang.reflect.Method.invoke(Method.java:498)
> Sep 01 15:53:20   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Sep 01 15:53:20   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> S

[jira] [Updated] (FLINK-24119) KafkaITCase.testTimestamps fails due to "Topic xxx already exist"

2022-10-20 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang updated FLINK-24119:
-
Fix Version/s: 1.16.1
   (was: 1.16.0)

> KafkaITCase.testTimestamps fails due to "Topic xxx already exist"
> -
>
> Key: FLINK-24119
> URL: https://issues.apache.org/jira/browse/FLINK-24119
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.15.0, 1.16.0
>Reporter: Xintong Song
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: auto-deprioritized-critical, test-stability
> Fix For: 1.16.1
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23328&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=7419
> {code}
> Sep 01 15:53:20 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 162.65 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Sep 01 15:53:20 [ERROR] testTimestamps  Time elapsed: 23.237 s  <<< FAILURE!
> Sep 01 15:53:20 java.lang.AssertionError: Create test topic : tstopic failed, 
> org.apache.kafka.common.errors.TopicExistsException: Topic 'tstopic' already 
> exists.
> Sep 01 15:53:20   at org.junit.Assert.fail(Assert.java:89)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testTimestamps(KafkaITCase.java:191)
> Sep 01 15:53:20   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Sep 01 15:53:20   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Sep 01 15:53:20   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Sep 01 15:53:20   at java.lang.reflect.Method.invoke(Method.java:498)
> Sep 01 15:53:20   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Sep 01 15:53:20   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Sep 01 15:53:20   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Sep 01 15:53:20   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24119) KafkaITCase.testTimestamps fails due to "Topic xxx already exist"

2022-10-20 Thread Xingbo Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621469#comment-17621469
 ] 

Xingbo Huang commented on FLINK-24119:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42266&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203

> KafkaITCase.testTimestamps fails due to "Topic xxx already exist"
> -
>
> Key: FLINK-24119
> URL: https://issues.apache.org/jira/browse/FLINK-24119
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.15.0, 1.16.0
>Reporter: Xintong Song
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: auto-deprioritized-critical, test-stability
> Fix For: 1.16.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23328&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=7419
> {code}
> Sep 01 15:53:20 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 162.65 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Sep 01 15:53:20 [ERROR] testTimestamps  Time elapsed: 23.237 s  <<< FAILURE!
> Sep 01 15:53:20 java.lang.AssertionError: Create test topic : tstopic failed, 
> org.apache.kafka.common.errors.TopicExistsException: Topic 'tstopic' already 
> exists.
> Sep 01 15:53:20   at org.junit.Assert.fail(Assert.java:89)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testTimestamps(KafkaITCase.java:191)
> Sep 01 15:53:20   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Sep 01 15:53:20   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Sep 01 15:53:20   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Sep 01 15:53:20   at java.lang.reflect.Method.invoke(Method.java:498)
> Sep 01 15:53:20   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Sep 01 15:53:20   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Sep 01 15:53:20   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Sep 01 15:53:20   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29684) [UI] Upgrade runtime web Angular framework and associated deps to v14

2022-10-20 Thread Junhan Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junhan Yang closed FLINK-29684.
---
Resolution: Fixed

> [UI] Upgrade runtime web Angular framework and associated deps to v14
> -
>
> Key: FLINK-29684
> URL: https://issues.apache.org/jira/browse/FLINK-29684
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Junhan Yang
>Assignee: Junhan Yang
>Priority: Major
>  Labels: pull-request-available
>
> Angular framework and NG-ZORRO both have released their stable v14 versions. 
> It is a good time to bump them to the latest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29684) [UI] Upgrade runtime web Angular framework and associated deps to v14

2022-10-20 Thread Junhan Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621467#comment-17621467
 ] 

Junhan Yang commented on FLINK-29684:
-

master: 97bc331f84285117dc4c30bc583cc2df45196356

> [UI] Upgrade runtime web Angular framework and associated deps to v14
> -
>
> Key: FLINK-29684
> URL: https://issues.apache.org/jira/browse/FLINK-29684
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Junhan Yang
>Assignee: Junhan Yang
>Priority: Major
>  Labels: pull-request-available
>
> Angular framework and NG-ZORRO both have released their stable v14 versions. 
> It is a good time to bump them to the latest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yangjunhan merged pull request #21121: [FLINK-29684] chore: bump runtime web Angular to v14

2022-10-20 Thread GitBox


yangjunhan merged PR #21121:
URL: https://github.com/apache/flink/pull/21121


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29687) Do not throw exception when Lookup table is empty

2022-10-20 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-29687.

Fix Version/s: table-store-0.2.2
   Resolution: Fixed

master: b54c474e2596271fedfffaec5eddc895e0bb5455
release-0.2: 56c952e059686a5096163356f563548bb1dc3f2e

> Do not throw exception when Lookup table is empty
> -
>
> Key: FLINK-29687
> URL: https://issues.apache.org/jira/browse/FLINK-29687
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.3.0, table-store-0.2.1
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0, table-store-0.2.2
>
> Attachments: image-2022-10-19-17-44-10-062.png
>
>
> !image-2022-10-19-17-44-10-062.png|width=724,height=431!
> When scanning the Lookup table, it is likely that the snapshot does not be 
> committed at that moment. So it's better to wait for the commit other than 
> throwing an exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yangjunhan commented on pull request #21121: [FLINK-29684] chore: bump runtime web Angular to v14

2022-10-20 Thread GitBox


yangjunhan commented on PR #21121:
URL: https://github.com/apache/flink/pull/21121#issuecomment-1286408461

   The CI failure does not relate to the PR's changes, so ready to merge.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #323: [FLINK-29687] Do not throw exception when Lookup table is empty

2022-10-20 Thread GitBox


JingsongLi merged PR #323:
URL: https://github.com/apache/flink-table-store/pull/323


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29703) Fail to call unix_timestamp in runtime in Hive dialect

2022-10-20 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621463#comment-17621463
 ] 

luoyuxia commented on FLINK-29703:
--

The reason is the hive function unix_timestamp will call 
`SessionState.get().getQueryCurrentTimestamp()` in runtime.

But the SessionState.get() will return null since the SessionState will be 
closed in Flink.

Considering `SessionState.get().getQueryCurrentTimestamp()` is actually a fixed 
value set in query parse phase.

To fix it, we need to convert the function call `unix_timestamp` to literal 
instead of evaluting in runtime siince the value has no difference between 
convert it to a literal and evaluating it in runtime. 

> Fail to call unix_timestamp  in runtime in Hive dialect 
> 
>
> Key: FLINK-29703
> URL: https://issues.apache.org/jira/browse/FLINK-29703
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] HuangXingBo commented on a diff in pull request #21110: [FLINK-29479][python] Supports whether uses system env for python.

2022-10-20 Thread GitBox


HuangXingBo commented on code in PR #21110:
URL: https://github.com/apache/flink/pull/21110#discussion_r1001310191


##
docs/layouts/shortcodes/generated/python_configuration.html:
##
@@ -110,5 +110,11 @@
 Integer
 The maximum number of states cached in a Python UDF worker. 
Note that this is an experimental flag and might not be available in future 
releases.
 
+
+python.systemenv.enabled
+true
+Boolean
+When it is false, system env for Python will be disabled.

Review Comment:
   ```suggestion
   Specify whether to load System Environment when starting 
Python worker.
   ```



##
flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java:
##
@@ -101,9 +104,11 @@ public AbstractPythonEnvironmentManager(
 PythonDependencyInfo dependencyInfo,
 String[] tmpDirectories,
 Map systemEnv,
-JobID jobID) {
+JobID jobID,
+Configuration config) {
 this.dependencyInfo = Objects.requireNonNull(dependencyInfo);
 this.tmpDirectories = Objects.requireNonNull(tmpDirectories);
+this.systemEnvEnabled = 
config.get(PythonOptions.PYTHON_SYSTEMENV_ENABLED);

Review Comment:
   We can move this to the `createPythonEnvironmentManager` so that we can 
decide whether systemEnv is a empty Map



##
flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java:
##
@@ -101,9 +104,11 @@ public AbstractPythonEnvironmentManager(
 PythonDependencyInfo dependencyInfo,
 String[] tmpDirectories,
 Map systemEnv,
-JobID jobID) {
+JobID jobID,
+Configuration config) {

Review Comment:
   We don't need to pass in the config parameter



##
flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java:
##
@@ -42,8 +43,9 @@ public EmbeddedPythonEnvironmentManager(
 PythonDependencyInfo dependencyInfo,
 String[] tmpDirectories,
 Map systemEnv,
-JobID jobID) {
-super(dependencyInfo, tmpDirectories, systemEnv, jobID);
+JobID jobID,
+Configuration config) {

Review Comment:
   ditto



##
flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java:
##
@@ -172,4 +172,20 @@ void testPythonClientExecutable() {
 configuration.get(PythonOptions.PYTHON_CLIENT_EXECUTABLE);
 
assertThat(actualPythonClientExecutable).isEqualTo(expectedPythonClientExecutable);
 }
+
+@Test
+void testPythonSystemEnvEnabled() {

Review Comment:
   I don't think this test has any value. If we want to test, we should test 
whether the `systemEnv` obtained by the operator is different when the config 
is in effect.



##
flink-python/src/main/java/org/apache/flink/python/PythonOptions.java:
##
@@ -83,6 +83,13 @@ public class PythonOptions {
 + "The interval between each profiling is 
determined by the config options "
 + "python.fn-execution.bundle.size and 
python.fn-execution.bundle.time.");
 
+/** The configuration to enable or disable system env for Python 
execution. */
+public static final ConfigOption PYTHON_SYSTEMENV_ENABLED =
+ConfigOptions.key("python.systemenv.enabled")
+.booleanType()
+.defaultValue(true)
+.withDescription("When it is false, system env for Python 
will be disabled.");

Review Comment:
   ditto



##
flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java:
##
@@ -73,6 +75,7 @@ public abstract class AbstractPythonEnvironmentManager 
implements PythonEnvironm
 protected final PythonDependencyInfo dependencyInfo;
 
 private final Map systemEnv;
+private final boolean systemEnvEnabled;

Review Comment:
   we don't need this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29498) Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API

2022-10-20 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621460#comment-17621460
 ] 

Yun Gao commented on FLINK-29498:
-

[~eric.xiao] Thanks for the PR! I'll have a look

> Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API
> --
>
> Key: FLINK-29498
> URL: https://issues.apache.org/jira/browse/FLINK-29498
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.15.2
>Reporter: Eric Xiao
>Assignee: Eric Xiao
>Priority: Minor
>
> We are using the async I/O to make HTTP calls and one of the features we 
> wanted to leverage was the retries, so we pulled the newest commit: 
> [http://github.com/apache/flink/pull/19983] into our internal Flink fork.
> When I try calling the function {{AsyncDataStream.unorderedWaitWithRetry}} 
> from the scala API I with a retry strategy from the java API I get an error 
> as {{unorderedWaitWithRetry}} expects a scala retry strategy. The problem is 
> that retry strategies were only implemented in java and not Scala in this PR: 
> [http://github.com/apache/flink/pull/19983].
>  
> Here is some of the code to reproduce the error:
> {code:java}
> import org.apache.flink.streaming.api.scala.AsyncDataStream
> import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => 
> JAsyncRetryStrategies}
> val javaAsyncRetryStrategy = new 
> JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 100L)
> .build()
> val data = AsyncDataStream.unorderedWaitWithRetry(
>   source,
>   asyncOperator,
>   pipelineTimeoutInMs,
>   TimeUnit.MILLISECONDS,
>   javaAsyncRetryStrategy
> ){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #160: [FLINK-29434] Add AlgoOperator for Splitter

2022-10-20 Thread GitBox


weibozhao commented on code in PR #160:
URL: https://github.com/apache/flink-ml/pull/160#discussion_r1001308391


##
docs/content/docs/operators/feature/splitter.md:
##
@@ -0,0 +1,143 @@
+---
+title: "Splitter"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/splitter.html
+---
+
+
+
+## Splitter
+
+An AlgoOperator which splits a dataset into two datasets according to a given 
fraction.
+
+### Parameters

Review Comment:
   The input and output are not changed after this transformer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-23074) There is a class conflict between flink-connector-hive and flink-parquet

2022-10-20 Thread Kai Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621456#comment-17621456
 ] 

Kai Chen commented on FLINK-23074:
--

[~luoyuxia]  After this change, the version of 
org.apache.flink.hive.shaded.parquet has been changed to the parqeut-hadoop 
version used by FLINK(for example, parquet-hadoop-1.12.2 in flink-1.15). I 
found this issue when i met zstd-jni not found problem(since 
parquet-hadoop-1.12 using zstd-jni-1.4.9-1 and we did not include 
zstd-jni-1.4.9-1 in our classpath).

Now i have included zstd-jni-1.4.9-1 in our classpath and everything is ok now. 
But i am still wondering the reason  of this change. Maybe relates to some 
vulnerabilities?

> There is a class conflict between flink-connector-hive and flink-parquet
> 
>
> Key: FLINK-23074
> URL: https://issues.apache.org/jira/browse/FLINK-23074
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Luning Wang
>Assignee: Luning Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.12.5, 1.13.2
>
> Attachments: E8C394D1-F970-4825-82CD-3EFA74C65B27.png, 
> image-2021-06-23-17-26-32-559.png, image-2021-07-01-18-23-47-105.png, 
> image-2021-07-01-18-40-00-991.png, image-2021-07-01-18-40-31-729.png, 
> screenshot-1.png, screenshot-3.png, screenshot-4.png, screenshot-5.png, 
> screenshot-6.png
>
>
> flink-connector-hive 2.3.6 include parquet-hadoop 1.8.1 version but 
> flink-parquet include 1.11.1.
> org.apache.parquet.hadoop.example.GroupWriteSupport
>  is different.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] clarax commented on a diff in pull request #406: [FLINK-29695] Create a utility to report the status of the last savep…

2022-10-20 Thread GitBox


clarax commented on code in PR #406:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/406#discussion_r1000931100


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointStatus.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.status;

Review Comment:
   Makes sense. Moved to package org.apache.flink.kubernetes.operator.utils



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29711) Topic notification not present in metadata after 60000 ms.

2022-10-20 Thread Durgesh Mishra (Jira)
Durgesh Mishra created FLINK-29711:
--

 Summary: Topic notification not present in metadata after 6 ms.
 Key: FLINK-29711
 URL: https://issues.apache.org/jira/browse/FLINK-29711
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.6, 1.14.4
Reporter: Durgesh Mishra


Failed to send data to Kafka null with 
FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, 
closed=false}
at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405)
at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29615) MetricStore does not remove metrics of nonexistent subtasks when adaptive scheduler lowers job parallelism

2022-10-20 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-29615.

Fix Version/s: 1.17.0
   1.16.1
   Resolution: Fixed

- master (1.17): f11c322467aa7e8a6d58703a72149152e8b58883
- release-1.16: 9f5ad7ea6ae04722d3934424f7f37cbfcbf2a7f5

> MetricStore does not remove metrics of nonexistent subtasks when adaptive 
> scheduler lowers job parallelism
> --
>
> Key: FLINK-29615
> URL: https://issues.apache.org/jira/browse/FLINK-29615
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / REST
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> We are exploring autoscaling Flink with Reactive mode using metrics from 
> Flink REST for guidance, and found that the metrics are not correctly updated.
>  
> *Problem*
> MetricStore does not remove metrics of nonexistent subtasks when adaptive 
> scheduler lowers job parallelism (aka, num of subtasks decreases) and users 
> will see metrics of nonexistent subtasks on Web UI (e.g. the task 
> backpressure page) or REST API response. It causes confusion and occupies 
> extra memory.
>  
> *Proposed Solution*
> Thanks to FLINK-29132 & FLINK-28588,  Flink will now update current execution 
> attempts when updating metrics. Since the active subtask info is included in 
> the current execution attempt info, we are able to retain active subtasks 
> using the current execution attempt info.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #21051: [FLINK-29615] Retain active subtasks in TaskMetricStore when fetching metrics to accommodate dynamic scaling

2022-10-20 Thread GitBox


xintongsong closed pull request #21051: [FLINK-29615] Retain active subtasks in 
TaskMetricStore when fetching metrics to accommodate dynamic scaling
URL: https://github.com/apache/flink/pull/21051


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29687) Do not throw exception when Lookup table is empty

2022-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29687:
---
Labels: pull-request-available  (was: )

> Do not throw exception when Lookup table is empty
> -
>
> Key: FLINK-29687
> URL: https://issues.apache.org/jira/browse/FLINK-29687
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.3.0, table-store-0.2.1
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
> Attachments: image-2022-10-19-17-44-10-062.png
>
>
> !image-2022-10-19-17-44-10-062.png|width=724,height=431!
> When scanning the Lookup table, it is likely that the snapshot does not be 
> committed at that moment. So it's better to wait for the commit other than 
> throwing an exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] LadyForest closed pull request #323: [FLINK-29687] Do not throw exception when Lookup table is empty

2022-10-20 Thread GitBox


LadyForest closed pull request #323: [FLINK-29687] Do not throw exception when 
Lookup table is empty
URL: https://github.com/apache/flink-table-store/pull/323


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on pull request #323: [FLINK-29687] Do not throw exception when Lookup table is empty

2022-10-20 Thread GitBox


LadyForest commented on PR #323:
URL: 
https://github.com/apache/flink-table-store/pull/323#issuecomment-1286364083

   `PreAggregationITCase#testMergeInMemory` is unstable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-20 Thread Canope Nerda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621435#comment-17621435
 ] 

Canope Nerda commented on FLINK-29692:
--

Thanks [~jark] for suggesting the legacy group window agg. So last value in 
your example is based on rowtime or proctime? Another problem in my case is 
that we require time precision to be microsecond, but time attribute only 
supports millisecond so far.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29674) Apache Kafka Connector‘s “ setBounded” not valid

2022-10-20 Thread hongcha (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621432#comment-17621432
 ] 

hongcha edited comment on FLINK-29674 at 10/21/22 1:37 AM:
---

[~coderap] this is my test source code:

 
{code:java}
package test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import com.alibaba.fastjson.JSONObject;/** 
 * 
 * User: jiangwei
 * Date: Oct 12, 2022
 * Time: 10:22:20 AM
 */
public class KafkaWindowTest {

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        
        String jaasTemplate = 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, "xxx", "xxx");
        properties.put("sasl.jaas.config", jaasCfg);
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "PLAIN");
        
        Map offsets = new HashMap();
        String topic = "jw-test-kafka-w-offset-002";
        offsets.put(new TopicPartition(topic,0), 6L);
        
        KafkaSource source = KafkaSource.builder()
                .setBootstrapServers("xxx:9092")
                .setProperties(properties)
//                .setProperty("commit.offsets.on.checkpoint", "false")
                .setTopics(topic)
//                .setTopicPattern(java.util.regex.Pattern.compile(topic+".*"))
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setBounded(OffsetsInitializer.offsets(offsets))
                .build();
        try {
            final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
            //env.setParallelism(1);
            env.enableCheckpointing(5000);
            env.getCheckpointConfig()
                    
.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

            WatermarkStrategy wk = 
org.apache.flink.api.common.eventtime.WatermarkStrategy
                    .forBoundedOutOfOrderness(Duration.ZERO)
                    .withTimestampAssigner(new 
SerializableTimestampAssigner() {
                        @Override
                        public long extractTimestamp(String element, long 
recordTimestamp) {
                            return 
JSONObject.parseObject(element).getInteger("time") * 1000;
                        }
                    });             SingleOutputStreamOperator 
fromSource = env
                    .fromSource(source, wk, "Kafka Source");
            fromSource.print("first");
            fromSource.keyBy(data -> 
JSONObject.parseObject(data).getString("id"))
                    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                    .process(new ProcessWindowFunction, String, TimeWindow>() {
                        @Override
                        public void process(String arg0,
                               ProcessWindowFunction, 
String, TimeWindow>.Context arg1,
                                Iterable datas, 
Collector> out) throws Exception {
                            out.collect(datas);
                        }
                    }).print("window-data");
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

{code}
 

this is my test data:

 
{code:java}
"{'id':'1

[jira] [Comment Edited] (FLINK-29674) Apache Kafka Connector‘s “ setBounded” not valid

2022-10-20 Thread hongcha (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621432#comment-17621432
 ] 

hongcha edited comment on FLINK-29674 at 10/21/22 1:15 AM:
---

[~coderap] this is my test source code:

 
{code:java}
package test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;import 
com.alibaba.fastjson.JSONObject;/** 
 * 
 * User: jiangwei
 * Date: Oct 12, 2022
 * Time: 10:22:20 AM
 */
public class KafkaWindowTest {

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        
        String jaasTemplate = 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, "xxx", "xxx");
        properties.put("sasl.jaas.config", jaasCfg);
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "PLAIN");
        
        Map offsets = new HashMap();
        String topic = "jw-test-kafka-w-offset-002";
        offsets.put(new TopicPartition(topic,0), 6L);
        
        KafkaSource source = KafkaSource.builder()
                .setBootstrapServers("xxx:9092")
                .setProperties(properties)
//                .setProperty("commit.offsets.on.checkpoint", "false")
                .setTopics(topic)
//                .setTopicPattern(java.util.regex.Pattern.compile(topic+".*"))
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setBounded(OffsetsInitializer.offsets(offsets))
                .build();
        try {
            final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
            //env.setParallelism(1);
            env.enableCheckpointing(5000);
            env.getCheckpointConfig()
                    
.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            SingleOutputStreamOperator fromSource = env
                    .fromSource(source, WatermarkStrategy.noWatermarks(), 
"Kafka Source");
            fromSource.print("first");
            fromSource.keyBy(data -> 
JSONObject.parseObject(data).getString("id"))
                    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                    .process(new ProcessWindowFunction, String, TimeWindow>() {
                        @Override
                        public void process(String arg0,
                               ProcessWindowFunction, 
String, TimeWindow>.Context arg1,
                                Iterable datas, 
Collector> out) throws Exception {
                            out.collect(datas);
                        }
                    }).print("window-data");
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

{code}
 

this is my test data:

 
{code:java}
"{'id':'1','kpi':11,'kpi1':90,'time':1}", 
"{'id':'1','kpi':11,'kpi1':90,'time':2}",
"{'id':'1','kpi':11,'kpi1':90,'time':3}",
"{'id':'1','kpi':11,'kpi1':90,'time':4}",
"{'id':'1','kpi':11,'kpi1':90,'time':5}",
"{'id':'1','kpi':11,'kpi1':90,'time':6}", 
"end"{code}
 

 

this is success result

 
{code:java}
first> {"kpi":11,"id":"1","time":1,"kpi1":90}
first> {"kpi":11,"id":"1","time":2,"kpi1":90}
first> {"kpi":11,"id":"1","time":3,"kpi1":90}
first> {"kpi":11,"id":"1","time":4,"kpi1

[jira] [Commented] (FLINK-29674) Apache Kafka Connector‘s “ setBounded” not valid

2022-10-20 Thread hongcha (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621432#comment-17621432
 ] 

hongcha commented on FLINK-29674:
-

[~coderap] this is my test source code:

 
{code:java}
package test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;import 
com.alibaba.fastjson.JSONObject;/** 
 * 
 * User: jiangwei
 * Date: Oct 12, 2022
 * Time: 10:22:20 AM
 */
public class KafkaWindowTest {

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        
        String jaasTemplate = 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, "xxx", "xxx");
        properties.put("sasl.jaas.config", jaasCfg);
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "PLAIN");
        
        Map offsets = new HashMap();
        String topic = "jw-test-kafka-w-offset-002";
        offsets.put(new TopicPartition(topic,0), 6L);
        
        KafkaSource source = KafkaSource.builder()
                .setBootstrapServers("192.168.8.79:9092")
                .setProperties(properties)
//                .setProperty("commit.offsets.on.checkpoint", "false")
                .setTopics(topic)
//                .setTopicPattern(java.util.regex.Pattern.compile(topic+".*"))
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setBounded(OffsetsInitializer.offsets(offsets))
                .build();
        try {
            final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
            //env.setParallelism(1);
            env.enableCheckpointing(5000);
            env.getCheckpointConfig()
                    
.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            SingleOutputStreamOperator fromSource = env
                    .fromSource(source, WatermarkStrategy.noWatermarks(), 
"Kafka Source");
            fromSource.print("first");
            fromSource.keyBy(data -> 
JSONObject.parseObject(data).getString("id"))
                    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                    .process(new ProcessWindowFunction, String, TimeWindow>() {
                        @Override
                        public void process(String arg0,
                               ProcessWindowFunction, 
String, TimeWindow>.Context arg1,
                                Iterable datas, 
Collector> out) throws Exception {
                            out.collect(datas);
                        }
                    }).print("window-data");
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

{code}
 

this is my test data:

 
{code:java}
"{'id':'1','kpi':11,'kpi1':90,'time':1}", 
"{'id':'1','kpi':11,'kpi1':90,'time':2}",
"{'id':'1','kpi':11,'kpi1':90,'time':3}",
"{'id':'1','kpi':11,'kpi1':90,'time':4}",
"{'id':'1','kpi':11,'kpi1':90,'time':5}",
"{'id':'1','kpi':11,'kpi1':90,'time':6}", 
"end"{code}
 

 

this is success result

 
{code:java}
first> {"kpi":11,"id":"1","time":1,"kpi1":90}
first> {"kpi":11,"id":"1","time":2,"kpi1":90}
first> {"kpi":11,"id":"1","time":3,"kpi1":90}
first> {"kpi":11,"id":"1","time":4,"kpi1":90}
window-data> [{"kpi":11,"id":"1","ti

[jira] [Closed] (FLINK-29701) Refactor flink-table-store-benchmark and create micro benchmarks module

2022-10-20 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-29701.

Fix Version/s: table-store-0.3.0
 Assignee: Shammon
   Resolution: Fixed

master: cc6d29a464a8790324ef961722e2d822dea6dd33

> Refactor flink-table-store-benchmark and create micro benchmarks module
> ---
>
> Key: FLINK-29701
> URL: https://issues.apache.org/jira/browse/FLINK-29701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.3.0, table-store-0.2.2
>Reporter: Shammon
>Assignee: Shammon
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> Refactor the `flink-table-store-benchmark` to 
> `flink-table-store-cluster-benchmark`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #324: [FLINK-29701] Refactor flink-table-store-benchmark module to cluster benchmark

2022-10-20 Thread GitBox


JingsongLi merged PR #324:
URL: https://github.com/apache/flink-table-store/pull/324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27344) FLIP-222: Support full job lifecycle statements in SQL client

2022-10-20 Thread Alexey Leonov-Vendrovskiy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621418#comment-17621418
 ] 

Alexey Leonov-Vendrovskiy commented on FLINK-27344:
---

I think it is fine to proceed with the current plan if things are already 
decided and voted upon. The drawback could be is that it is could be more work 
to introduce changes to grammar (maybe this is already solved?) and general 
divergence from the standard, which is debatable.  

I suggest at least to keep the idea of system stored procedures for the future 
developments.  

W.r.t. I_S I agree, it might require some changes in the design of catalogs, 
so, looks like not now. The confusion could be resolved with the documentation 
;). 

 

> FLIP-222: Support full job lifecycle statements in SQL client
> -
>
> Key: FLINK-27344
> URL: https://issues.apache.org/jira/browse/FLINK-27344
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Major
>
> With the efforts in FLIP-24 and FLIP-91, Flink SQL client supports submitting 
> SQL jobs but lacks further support for their lifecycles afterward which is 
> crucial for streaming use cases. That means Flink SQL client users have to 
> turn to other clients (e.g. CLI) or APIs (e.g. REST API) to manage the jobs, 
> like triggering savepoints or canceling queries, which makes the user 
> experience of SQL client incomplete. 
> Therefore, this proposal aims to complete the capability of SQL client by 
> adding job lifecycle statements. With these statements, users could manage 
> jobs and savepoints through pure SQL in SQL client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] clarax commented on pull request #406: [FLINK-29695] Create a utility to report the status of the last savep…

2022-10-20 Thread GitBox


clarax commented on PR #406:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/406#issuecomment-1286272479

   Local docker build and deployment to Minikube succeeded while image build 
and deployment failed on the workflow. Anything else I should look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29610) Infinite timeout is used in SavepointHandlers calls to RestfulGateway

2022-10-20 Thread Jiale Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621406#comment-17621406
 ] 

Jiale Tan commented on FLINK-29610:
---

Thanks [~gaoyunhaii] for the extra context. Will look into this and potentially 
follow up here with a PR.

> Infinite timeout is used in SavepointHandlers calls to RestfulGateway
> -
>
> Key: FLINK-29610
> URL: https://issues.apache.org/jira/browse/FLINK-29610
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Jiale Tan
>Priority: Major
>
> In {{{}SavepointHandlers{}}}, both 
> {{[StopWithSavepointHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L214]}}
>  and 
> {{[SavepointTriggerHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L258]}}
>  are calling {{RestfulGateway}} with {{RpcUtils.INF_TIMEOUT}}
>  
> As pointed out in 
> [this|https://github.com/apache/flink/pull/20852#discussion_r992218970] 
> discussion, we will need to either figure out why {{RpcUtils.INF_TIMEOUT}} is 
> used, or remove it if there is no strong reason to use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"

2022-10-20 Thread GitBox


zentol commented on code in PR #21127:
URL: https://github.com/apache/flink/pull/21127#discussion_r1001199344


##
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java:
##
@@ -28,21 +28,100 @@
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the LIST command. */
 class CliFrontendListTest extends CliFrontendTestBase {
 
+private static final List TRICKY_START_TIMES =
+Arrays.asList(
+1664177946934L,

Review Comment:
   From what I found you need 32 elements for the sort to _fail with an 
exception_, but you can use less elements and end up with an unsorted list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #21126: [FLINK-29706][build] Remove japicmp dependency bumps

2022-10-20 Thread GitBox


zentol commented on PR #21126:
URL: https://github.com/apache/flink/pull/21126#issuecomment-1286242752

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-dynamodb] dannycranmer commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-20 Thread GitBox


dannycranmer commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1001103974


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter extends AsyncSinkWriter 
{

Review Comment:
   @nirtsruya sorry I might not have explained my idea well. I actually meant 
that here we would use your new type:
   - `class DynamoDbSinkWriter extends AsyncSinkWriter {`
   
   Now we can modify `DynamoDbWriteRequest` later to support the conditional 
fields for non-batch mode. Inside `submitRequestEntries` we would convert 
`DynamoDbWriteRequest` into `WriteRequest` for batch or `PutItemRequest` for 
non batch. We would need to also convert  `WriteRequest` back into 
`DynamoDbWriteRequest` for failed records (need to check we have all the 
required info).
   
   There will be a slight performance hit when we retry, as records are 
converted multiple times. But this allows us to evolve the connector to support 
batch/non-batch without having to duplicate all the classes as you explained
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21128: [FLINK-29710] Bump minimum supported Hadoop version to 2.10.2

2022-10-20 Thread GitBox


flinkbot commented on PR #21128:
URL: https://github.com/apache/flink/pull/21128#issuecomment-1286119990

   
   ## CI report:
   
   * 655a9680f2a8861b3f43fd7475bf3e66e9264ca2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29710) Upgrade the minimal supported hadoop version to 2.10.2

2022-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29710:
---
Labels: pull-request-available  (was: )

> Upgrade the minimal supported hadoop version to 2.10.2
> --
>
> Key: FLINK-29710
> URL: https://issues.apache.org/jira/browse/FLINK-29710
> Project: Flink
>  Issue Type: Technical Debt
>  Components: FileSystems
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> Hadoop 2.8.5 is vulnerable for multiple CVEs such as 
> https://nvd.nist.gov/vuln/detail/CVE-2022-25168 and 
> https://nvd.nist.gov/vuln/detail/CVE-2022-26612 which are classified as 
> Critical. While Flink is not directly impacted by those, we do see 
> vulnerability scanners flag Flink as being vulnerable. We could easily 
> mitigate that by bumping the minimal supported version of Hadoop to 2.10.2.
> Please note that this doesn't break the binary protocol compatibility, which 
> means that 2.10.2 client can still talk to older servers.
> Discussion thread: 
> https://lists.apache.org/thread/tgw2dmnoxm7sdwyjohskmvpk3pdd3qvm



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser opened a new pull request, #21128: [FLINK-29710] Bump minimum supported Hadoop version to 2.10.2

2022-10-20 Thread GitBox


MartijnVisser opened a new pull request, #21128:
URL: https://github.com/apache/flink/pull/21128

   ## What is the purpose of the change
   
   * Bump the minimum supported Hadoop version to 2.10.2
   
   ## Brief change log
   
   * Changed all references from Hadoop 2.8.5 to 2.10.2
   * Moved Hive specific Hadoop versioning into the `flink-connector-hive` POM
   * Make the SQL Gateway use the Hadoop version as defined in the parent POM
   * Make the SQL Client use the Hadoop version as defined in the parent POM
   
   The last 3 steps are taken to supersede/compromise 
https://github.com/apache/flink/pull/18321
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29710) Upgrade the minimal supported hadoop version to 2.10.2

2022-10-20 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-29710:
--

 Summary: Upgrade the minimal supported hadoop version to 2.10.2
 Key: FLINK-29710
 URL: https://issues.apache.org/jira/browse/FLINK-29710
 Project: Flink
  Issue Type: Technical Debt
  Components: FileSystems
Reporter: Martijn Visser
Assignee: Martijn Visser


Hadoop 2.8.5 is vulnerable for multiple CVEs such as 
https://nvd.nist.gov/vuln/detail/CVE-2022-25168 and 
https://nvd.nist.gov/vuln/detail/CVE-2022-26612 which are classified as 
Critical. While Flink is not directly impacted by those, we do see 
vulnerability scanners flag Flink as being vulnerable. We could easily mitigate 
that by bumping the minimal supported version of Hadoop to 2.10.2.

Please note that this doesn't break the binary protocol compatibility, which 
means that 2.10.2 client can still talk to older servers.

Discussion thread: 
https://lists.apache.org/thread/tgw2dmnoxm7sdwyjohskmvpk3pdd3qvm



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29709) Bump Pulsar to 2.10.2

2022-10-20 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-29709:
---

 Summary: Bump Pulsar to 2.10.2
 Key: FLINK-29709
 URL: https://issues.apache.org/jira/browse/FLINK-29709
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Pulsar
Affects Versions: 1.17.0
Reporter: Yufan Sheng


Pulsar released a new version 2.10.2 which contains a lot of bugfix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser commented on pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"

2022-10-20 Thread GitBox


MartijnVisser commented on PR #21127:
URL: https://github.com/apache/flink/pull/21127#issuecomment-1286072331

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ericxiao251 commented on pull request #21077: [FlINK-29248] Add Scala Async Retry Strategies and ResultPredicates Helper Classes

2022-10-20 Thread GitBox


ericxiao251 commented on PR #21077:
URL: https://github.com/apache/flink/pull/21077#issuecomment-1286039619

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29498) Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API

2022-10-20 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621332#comment-17621332
 ] 

Eric Xiao commented on FLINK-29498:
---

[~gaoyunhaii], I believe I have a 
[PR|https://github.com/apache/flink/pull/21077] that is in a reviewable state, 
what is the process of getting folks from the community to review the PR?

> Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API
> --
>
> Key: FLINK-29498
> URL: https://issues.apache.org/jira/browse/FLINK-29498
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.15.2
>Reporter: Eric Xiao
>Assignee: Eric Xiao
>Priority: Minor
>
> We are using the async I/O to make HTTP calls and one of the features we 
> wanted to leverage was the retries, so we pulled the newest commit: 
> [http://github.com/apache/flink/pull/19983] into our internal Flink fork.
> When I try calling the function {{AsyncDataStream.unorderedWaitWithRetry}} 
> from the scala API I with a retry strategy from the java API I get an error 
> as {{unorderedWaitWithRetry}} expects a scala retry strategy. The problem is 
> that retry strategies were only implemented in java and not Scala in this PR: 
> [http://github.com/apache/flink/pull/19983].
>  
> Here is some of the code to reproduce the error:
> {code:java}
> import org.apache.flink.streaming.api.scala.AsyncDataStream
> import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => 
> JAsyncRetryStrategies}
> val javaAsyncRetryStrategy = new 
> JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 100L)
> .build()
> val data = AsyncDataStream.unorderedWaitWithRetry(
>   source,
>   asyncOperator,
>   pipelineTimeoutInMs,
>   TimeUnit.MILLISECONDS,
>   javaAsyncRetryStrategy
> ){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29498) Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API

2022-10-20 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621332#comment-17621332
 ] 

Eric Xiao edited comment on FLINK-29498 at 10/20/22 7:30 PM:
-

Hi [~gaoyunhaii], I believe I have a 
[PR|https://github.com/apache/flink/pull/21077] that is in a reviewable state, 
what is the process of getting folks from the community to review the PR?


was (Author: JIRAUSER295489):
[~gaoyunhaii], I believe I have a 
[PR|https://github.com/apache/flink/pull/21077] that is in a reviewable state, 
what is the process of getting folks from the community to review the PR?

> Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API
> --
>
> Key: FLINK-29498
> URL: https://issues.apache.org/jira/browse/FLINK-29498
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.15.2
>Reporter: Eric Xiao
>Assignee: Eric Xiao
>Priority: Minor
>
> We are using the async I/O to make HTTP calls and one of the features we 
> wanted to leverage was the retries, so we pulled the newest commit: 
> [http://github.com/apache/flink/pull/19983] into our internal Flink fork.
> When I try calling the function {{AsyncDataStream.unorderedWaitWithRetry}} 
> from the scala API I with a retry strategy from the java API I get an error 
> as {{unorderedWaitWithRetry}} expects a scala retry strategy. The problem is 
> that retry strategies were only implemented in java and not Scala in this PR: 
> [http://github.com/apache/flink/pull/19983].
>  
> Here is some of the code to reproduce the error:
> {code:java}
> import org.apache.flink.streaming.api.scala.AsyncDataStream
> import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => 
> JAsyncRetryStrategies}
> val javaAsyncRetryStrategy = new 
> JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 100L)
> .build()
> val data = AsyncDataStream.unorderedWaitWithRetry(
>   source,
>   asyncOperator,
>   pipelineTimeoutInMs,
>   TimeUnit.MILLISECONDS,
>   javaAsyncRetryStrategy
> ){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ericxiao251 commented on pull request #21077: [FlINK-29248] Add Scala Async Retry Strategies and ResultPredicates Helper Classes

2022-10-20 Thread GitBox


ericxiao251 commented on PR #21077:
URL: https://github.com/apache/flink/pull/21077#issuecomment-1286022609

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29708) Enrich Flink Kubernetes Operator CRD error field

2022-10-20 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621318#comment-17621318
 ] 

Gyula Fora commented on FLINK-29708:


Looks good!
Maybe we could shorten operatorErrorType -> type

I am a bit torn about httpResponseCode, we should probably only include it for 
specific error types. 

> Enrich Flink Kubernetes Operator CRD error field
> 
>
> Key: FLINK-29708
> URL: https://issues.apache.org/jira/browse/FLINK-29708
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Daren Wong
>Assignee: Daren Wong
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> h1. Problem Statement:
> FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
> String type. Currently, this field stores various errors such as:
>  * CR validation error
>  * Missing SessionJob error/ Missing JobManager deployment error
>  * Unknown Job error
>  * DeploymentFailedException
>  * ReconciliationError such as RestClientException from Flink Internal such 
> as FlinkRest and FlinkRuntime
> It is insufficient to store each error simply as string only. We need to 
> include some exception metadata to help operator handle this error 
> accordingly. For example, it is very useful to know the HttpResponseStatus 
> code from RestClientException.
> h1. Proposed Solution:
>  * The error field should store a JSON with exception metadata. For example:
> {code:java}
> {
>   "operatorErrorType": "JobManagerNotFoundException",
>   "message": "JobManager with leadership ID: 1234 was not found",
>   "stackTrace": "JobManager lost connection at ", 
>   "httpResponseCode": "400"
> } {code}
>  * The stackTrace field can be enabled or disabled via spec change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29708) Enrich Flink Kubernetes Operator CRD error field

2022-10-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-29708:
--

Assignee: Daren Wong

> Enrich Flink Kubernetes Operator CRD error field
> 
>
> Key: FLINK-29708
> URL: https://issues.apache.org/jira/browse/FLINK-29708
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Daren Wong
>Assignee: Daren Wong
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> h1. Problem Statement:
> FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
> String type. Currently, this field stores various errors such as:
>  * CR validation error
>  * Missing SessionJob error/ Missing JobManager deployment error
>  * Unknown Job error
>  * DeploymentFailedException
>  * ReconciliationError such as RestClientException from Flink Internal such 
> as FlinkRest and FlinkRuntime
> It is insufficient to store each error simply as string only. We need to 
> include some exception metadata to help operator handle this error 
> accordingly. For example, it is very useful to know the HttpResponseStatus 
> code from RestClientException.
> h1. Proposed Solution:
>  * The error field should store a JSON with exception metadata. For example:
> {code:java}
> {
>   "operatorErrorType": "JobManagerNotFoundException",
>   "message": "JobManager with leadership ID: 1234 was not found",
>   "stackTrace": "JobManager lost connection at ", 
>   "httpResponseCode": "400"
> } {code}
>  * The stackTrace field can be enabled or disabled via spec change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

2022-10-20 Thread Yordan Pavlov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621314#comment-17621314
 ] 

Yordan Pavlov commented on FLINK-16419:
---

Thanks for your feedback [~martijnvisser] 

> If you're talking about unlimited time as a recovery mechanism, you retain 
> your Kafka events also for ever, correct?
We would want this to be irrelevant when talking about the Sink. Previous data 
may or may not be in the output topic.


> Are you still on Flink 1.14 and using the KafkaSink?
We have a fleet of Flink jobs running different versions, I think this issue is 
consistent across 1.14 and 1.15.2.

> Can you verify that you've taken all the necessary steps for enabling 
> Exactly-Once properly? 
I can verify that this matches our setup.

> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> 
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Reporter: Jun Qin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ericxiao251 commented on a diff in pull request #21077: [FlINK-29248] Add Scala Async Retry Strategies and ResultPredicates Helper Classes

2022-10-20 Thread GitBox


ericxiao251 commented on code in PR #21077:
URL: https://github.com/apache/flink/pull/21077#discussion_r1000959923


##
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import java.util
+import java.util.function.Predicate
+
+object AsyncRetryStrategies {
+
+  class RetryPredicate[OUT](
+  final val resultPredicate: Option[Predicate[util.Collection[OUT]]],
+  final val exceptionPredicate: Option[Predicate[Throwable]])
+extends AsyncRetryPredicate[OUT]
+
+  @SerialVersionUID(1L)
+  class FixedDelayRetryStrategy[OUT](
+  private val maxAttempts: Int,
+  private val backoffTimeMillis: Long,
+  private val resultPredicate: Option[Predicate[util.Collection[OUT]]],
+  private val exceptionPredicate: Option[Predicate[Throwable]]
+  ) extends AsyncRetryStrategy[OUT] {
+override def canRetry(currentAttempts: Int): Boolean = currentAttempts <= 
maxAttempts
+
+override def getBackoffTimeMillis(currentAttempts: Int): Long = {
+  backoffTimeMillis
+}
+
+override def getRetryPredicate(): AsyncRetryPredicate[OUT] =
+  new RetryPredicate(resultPredicate, exceptionPredicate)
+  }
+
+  @SerialVersionUID(1L)
+  class FixedDelayRetryStrategyBuilder[OUT](
+  private val maxAttempts: Int,
+  private val backoffTimeMillis: Long
+  ) {
+private var resultRetryPredicate: Option[Predicate[util.Collection[OUT]]] 
= None

Review Comment:
   Good catch, added :)! Interesting we don't do some of these checks on the 
exponential retry strategy. Will leave this unresolved for visibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29708) Enrich Flink Kubernetes Operator CRD error field

2022-10-20 Thread Daren Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daren Wong updated FLINK-29708:
---
Description: 
h1. Problem Statement:

FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
String type. Currently, this field stores various errors such as:
 * CR validation error
 * Missing SessionJob error/ Missing JobManager deployment error
 * Unknown Job error
 * DeploymentFailedException
 * ReconciliationError such as RestClientException from Flink Internal such as 
FlinkRest and FlinkRuntime

It is insufficient to store each error simply as string only. We need to 
include some exception metadata to help operator handle this error accordingly. 
For example, it is very useful to know the HttpResponseStatus code from 
RestClientException.
h1. Proposed Solution:
 * The error field should store a JSON with exception metadata. For example:


{code:java}
{
  "operatorErrorType": "JobManagerNotFoundException",
  "message": "JobManager with leadership ID: 1234 was not found",
  "stackTrace": "JobManager lost connection at ", 
  "httpResponseCode": "400"
} {code}

 * The stackTrace field can be enabled or disabled via spec change.

  was:
h1. Problem Statement:

FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
String type. Currently, this field stores various errors such as:
 * CR validation error
 * Missing SessionJob error/ Missing JobManager deployment error
 * Unknown Job error
 * DeploymentFailedException
 * ReconciliationError such as RestClientException from Flink Internal such as 
FlinkRest and FlinkRuntime

It is insufficient to store each error simply as string only. We need to 
include some exception metadata to help operator handle this error accordingly. 
For example, it is very useful to know the HttpResponseStatus code from 
RestClientException.
h1. Proposed Solution:
 * The error field should store a JSON with exception metadata. For example:

{ "operatorErrorType": "JobManagerNotFoundException", "message": "JobManager 
with leadership ID: 1234 was not found", "stackTrace": "JobManager lost 
connection at ", "httpResponseCode": 400 }
 * The stackTrace field can be enabled or disabled via spec change.


> Enrich Flink Kubernetes Operator CRD error field
> 
>
> Key: FLINK-29708
> URL: https://issues.apache.org/jira/browse/FLINK-29708
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Daren Wong
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> h1. Problem Statement:
> FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
> String type. Currently, this field stores various errors such as:
>  * CR validation error
>  * Missing SessionJob error/ Missing JobManager deployment error
>  * Unknown Job error
>  * DeploymentFailedException
>  * ReconciliationError such as RestClientException from Flink Internal such 
> as FlinkRest and FlinkRuntime
> It is insufficient to store each error simply as string only. We need to 
> include some exception metadata to help operator handle this error 
> accordingly. For example, it is very useful to know the HttpResponseStatus 
> code from RestClientException.
> h1. Proposed Solution:
>  * The error field should store a JSON with exception metadata. For example:
> {code:java}
> {
>   "operatorErrorType": "JobManagerNotFoundException",
>   "message": "JobManager with leadership ID: 1234 was not found",
>   "stackTrace": "JobManager lost connection at ", 
>   "httpResponseCode": "400"
> } {code}
>  * The stackTrace field can be enabled or disabled via spec change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph

2022-10-20 Thread GitBox


MartijnVisser commented on PR #17873:
URL: https://github.com/apache/flink/pull/17873#issuecomment-1285943401

   Asking @godfreyhe to have a look at the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29708) Enrich Flink Kubernetes Operator CRD error field

2022-10-20 Thread Daren Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daren Wong updated FLINK-29708:
---
Description: 
h1. Problem Statement:

FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
String type. Currently, this field stores various errors such as:

1. CR validation error
2. Missing SessionJob error/ Missing JobManager deployment error
3. Unknown Job error
4. DeploymentFailedException
5. ReconciliationError such as RestClientException from Flink Internal such as 
FlinkRest and FlinkRuntime

It is insufficient to store each error simply as string only. We need to 
include some exception metadata to help operator handle this error accordingly. 
For example, it is very useful to know the HttpResponseStatus code from 
RestClientException.

h1. Proposed Solution:

1. The error field should store a JSON with exception metadata. For example:

{
"operatorErrorType": "JobManagerNotFoundException",
"message": "JobManager with leadership ID: 1234 was not found",
"stackTrace": "JobManager lost connection at ", 
"httpResponseCode": 400
}

2. The stackTrace field can be enabled or disabled via spec change.

  was:
*Problem Statement:
*
FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
String type. Currently, this field stores various errors such as:

1. CR validation error
2. Missing SessionJob error/ Missing JobManager deployment error
3. Unknown Job error
4. DeploymentFailedException
5. ReconciliationError such as RestClientException from Flink Internal such as 
FlinkRest and FlinkRuntime

It is insufficient to store each error simply as string only. We need to 
include some exception metadata to help operator handle this error accordingly. 
For example, it is very useful to know the HttpResponseStatus code from 
RestClientException.

*Proposed Solution:
*

1. The error field should store a JSON with exception metadata. For example:

{
"operatorErrorType": "JobManagerNotFoundException",
"message": "JobManager with leadership ID: 1234 was not found",
"stackTrace": "JobManager lost connection at ", 
"httpResponseCode": 400
}

2. The stackTrace field can be enabled or disabled via spec change.


> Enrich Flink Kubernetes Operator CRD error field
> 
>
> Key: FLINK-29708
> URL: https://issues.apache.org/jira/browse/FLINK-29708
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Daren Wong
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> h1. Problem Statement:
> FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
> String type. Currently, this field stores various errors such as:
> 1. CR validation error
> 2. Missing SessionJob error/ Missing JobManager deployment error
> 3. Unknown Job error
> 4. DeploymentFailedException
> 5. ReconciliationError such as RestClientException from Flink Internal such 
> as FlinkRest and FlinkRuntime
> It is insufficient to store each error simply as string only. We need to 
> include some exception metadata to help operator handle this error 
> accordingly. For example, it is very useful to know the HttpResponseStatus 
> code from RestClientException.
> h1. Proposed Solution:
> 1. The error field should store a JSON with exception metadata. For example:
> {
> "operatorErrorType": "JobManagerNotFoundException",
> "message": "JobManager with leadership ID: 1234 was not found",
> "stackTrace": "JobManager lost connection at ", 
> "httpResponseCode": 400
> }
> 2. The stackTrace field can be enabled or disabled via spec change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29708) Enrich Flink Kubernetes Operator CRD error field

2022-10-20 Thread Daren Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daren Wong updated FLINK-29708:
---
Description: 
h1. Problem Statement:

FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
String type. Currently, this field stores various errors such as:
 * CR validation error
 * Missing SessionJob error/ Missing JobManager deployment error
 * Unknown Job error
 * DeploymentFailedException
 * ReconciliationError such as RestClientException from Flink Internal such as 
FlinkRest and FlinkRuntime

It is insufficient to store each error simply as string only. We need to 
include some exception metadata to help operator handle this error accordingly. 
For example, it is very useful to know the HttpResponseStatus code from 
RestClientException.
h1. Proposed Solution:
 * The error field should store a JSON with exception metadata. For example:

{ "operatorErrorType": "JobManagerNotFoundException", "message": "JobManager 
with leadership ID: 1234 was not found", "stackTrace": "JobManager lost 
connection at ", "httpResponseCode": 400 }
 * The stackTrace field can be enabled or disabled via spec change.

  was:
h1. Problem Statement:

FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
String type. Currently, this field stores various errors such as:

1. CR validation error
2. Missing SessionJob error/ Missing JobManager deployment error
3. Unknown Job error
4. DeploymentFailedException
5. ReconciliationError such as RestClientException from Flink Internal such as 
FlinkRest and FlinkRuntime

It is insufficient to store each error simply as string only. We need to 
include some exception metadata to help operator handle this error accordingly. 
For example, it is very useful to know the HttpResponseStatus code from 
RestClientException.

h1. Proposed Solution:

1. The error field should store a JSON with exception metadata. For example:

{
"operatorErrorType": "JobManagerNotFoundException",
"message": "JobManager with leadership ID: 1234 was not found",
"stackTrace": "JobManager lost connection at ", 
"httpResponseCode": 400
}

2. The stackTrace field can be enabled or disabled via spec change.


> Enrich Flink Kubernetes Operator CRD error field
> 
>
> Key: FLINK-29708
> URL: https://issues.apache.org/jira/browse/FLINK-29708
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Daren Wong
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> h1. Problem Statement:
> FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
> String type. Currently, this field stores various errors such as:
>  * CR validation error
>  * Missing SessionJob error/ Missing JobManager deployment error
>  * Unknown Job error
>  * DeploymentFailedException
>  * ReconciliationError such as RestClientException from Flink Internal such 
> as FlinkRest and FlinkRuntime
> It is insufficient to store each error simply as string only. We need to 
> include some exception metadata to help operator handle this error 
> accordingly. For example, it is very useful to know the HttpResponseStatus 
> code from RestClientException.
> h1. Proposed Solution:
>  * The error field should store a JSON with exception metadata. For example:
> { "operatorErrorType": "JobManagerNotFoundException", "message": "JobManager 
> with leadership ID: 1234 was not found", "stackTrace": "JobManager lost 
> connection at ", "httpResponseCode": 400 }
>  * The stackTrace field can be enabled or disabled via spec change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] clarax commented on pull request #406: [FLINK-29695] Create a utility to report the status of the last savep…

2022-10-20 Thread GitBox


clarax commented on PR #406:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/406#issuecomment-1285941481

   "mvn clean verify" and local rerun of the failed test "testRollbackSession" 
passed. The errors in the log seems transient  errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29708) Enrich Flink Kubernetes Operator CRD error field

2022-10-20 Thread Daren Wong (Jira)
Daren Wong created FLINK-29708:
--

 Summary: Enrich Flink Kubernetes Operator CRD error field
 Key: FLINK-29708
 URL: https://issues.apache.org/jira/browse/FLINK-29708
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.0
Reporter: Daren Wong
 Fix For: kubernetes-operator-1.3.0


*Problem Statement:
*
FlinkDeployment and FlinkSessionJob CRD has a CommonStatus error field of 
String type. Currently, this field stores various errors such as:

1. CR validation error
2. Missing SessionJob error/ Missing JobManager deployment error
3. Unknown Job error
4. DeploymentFailedException
5. ReconciliationError such as RestClientException from Flink Internal such as 
FlinkRest and FlinkRuntime

It is insufficient to store each error simply as string only. We need to 
include some exception metadata to help operator handle this error accordingly. 
For example, it is very useful to know the HttpResponseStatus code from 
RestClientException.

*Proposed Solution:
*

1. The error field should store a JSON with exception metadata. For example:

{
"operatorErrorType": "JobManagerNotFoundException",
"message": "JobManager with leadership ID: 1234 was not found",
"stackTrace": "JobManager lost connection at ", 
"httpResponseCode": 400
}

2. The stackTrace field can be enabled or disabled via spec change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

2022-10-20 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621298#comment-17621298
 ] 

Martijn Visser edited comment on FLINK-16419 at 10/20/22 5:59 PM:
--

[~YordanPavlov] Let's see if we can resolve this. A couple of questions to 
confirm:

- If you're talking about unlimited time as a recovery mechanism, you retain 
your Kafka events also for ever, correct?
- Are you still on Flink 1.14 and using the KafkaSink?
- You're still using the default Kafka Client that's bundled with that Flink 
version?
- Can you verify that you've taken all the necessary steps for enabling 
Exactly-Once properly? There's an example cookbook recipe that shows all the 
steps at 
https://docs.immerok.cloud/docs/cookbook/exactly-once-with-apache-kafka-and-apache-flink/




was (Author: martijnvisser):
[~YordanPavlov] Let's see if we can resolve this. A couple of questions to 
confirm:

- If you're talking about unlimited time as a recovery mechanism, you retain 
your Kafka events also for ever, correct?
- Are you still on Flink 1.14 and using the KafkaSink?
- Can you verify that you've taken all the necessary steps for enabling 
Exactly-Once properly? There's an example cookbook recipe that shows all the 
steps at 
https://docs.immerok.cloud/docs/cookbook/exactly-once-with-apache-kafka-and-apache-flink/



> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> 
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Reporter: Jun Qin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors.

[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

2022-10-20 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621298#comment-17621298
 ] 

Martijn Visser commented on FLINK-16419:


[~YordanPavlov] Let's see if we can resolve this. A couple of questions to 
confirm:

- If you're talking about unlimited time as a recovery mechanism, you retain 
your Kafka events also for ever, correct?
- Are you still on Flink 1.14 and using the KafkaSink?
- Can you verify that you've taken all the necessary steps for enabling 
Exactly-Once properly? There's an example cookbook recipe that shows all the 
steps at 
https://docs.immerok.cloud/docs/cookbook/exactly-once-with-apache-kafka-and-apache-flink/



> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> 
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Reporter: Jun Qin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] clarax commented on a diff in pull request #406: [FLINK-29695] Create a utility to report the status of the last savep…

2022-10-20 Thread GitBox


clarax commented on code in PR #406:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/406#discussion_r1000953174


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointStatus.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.status;

Review Comment:
   also reverted the doc updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

2022-10-20 Thread Martijn Visser (Jira)


[ https://issues.apache.org/jira/browse/FLINK-16419 ]


Martijn Visser deleted comment on FLINK-16419:


was (Author: martijnvisser):
[~YordanPavlov] Just to verify, do you have unlimited retention for your Kafka 
events? 

> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> 
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Reporter: Jun Qin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

2022-10-20 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621293#comment-17621293
 ] 

Martijn Visser commented on FLINK-16419:


[~YordanPavlov] Just to verify, do you have unlimited retention for your Kafka 
events? 

> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> 
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Reporter: Jun Qin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ericxiao251 commented on a diff in pull request #21077: [FlINK-29248] Add Scala Async Retry Strategies and ResultPredicates Helper Classes

2022-10-20 Thread GitBox


ericxiao251 commented on code in PR #21077:
URL: https://github.com/apache/flink/pull/21077#discussion_r1000938133


##
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import java.util
+import java.util.function.Predicate
+
+object AsyncRetryStrategies {
+

Review Comment:
   I looked in the codebase and I believe that field is only used in deeper 
flink internals of the java API, in regards to the Scala API I didn't see a 
benefit of getting it over.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] clarax commented on a diff in pull request #406: [FLINK-29695] Create a utility to report the status of the last savep…

2022-10-20 Thread GitBox


clarax commented on code in PR #406:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/406#discussion_r1000931100


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointStatus.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.status;

Review Comment:
   Makes sense. Moved to package org.apache.flink.kubernetes.operator.observer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29582) SavepointWriter should be usable without any transformation

2022-10-20 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-29582.

Resolution: Fixed

master: d17ceaf2f8cb0a36c2b629b9f6f87e020ce79395

> SavepointWriter should be usable without any transformation
> ---
>
> Key: FLINK-29582
> URL: https://issues.apache.org/jira/browse/FLINK-29582
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> The SavepointWriter of the state processor API currently enforces at least 
> one transformation to be defined be the user.
> This is an irritating limitation; this means you can't use the API to delete 
> a state or use the new uid remapping function from FLINK-29457 without 
> specifying some dummy transformation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol merged pull request #21088: [FLINK-29582][state-processor] Relax 1+ operator requirement

2022-10-20 Thread GitBox


zentol merged PR #21088:
URL: https://github.com/apache/flink/pull/21088


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25841) Expose plan via TableEnvironment.compilePlanSql/executePlan

2022-10-20 Thread Harry Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621266#comment-17621266
 ] 

Harry Li commented on FLINK-25841:
--

Right now the CompiledPlan feature works for INSERT statement. Is there a plan 
or roadmap to support other types of statements, for example SELECT? 

> Expose plan via TableEnvironment.compilePlanSql/executePlan
> ---
>
> Key: FLINK-25841
> URL: https://issues.apache.org/jira/browse/FLINK-25841
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> - Introduce the helper API class 
> [CompiledPlan|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-CompiledPlan]
> - Allow [single SQL 
> statements|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-SingleSQLStatements]
>  to be generated and restored
> Mark all interfaces as {{@Experimental}}.
> What is particularly important is that we also test the end-to-end 
> experience. How good and helpful are our exceptions for legacy types, legacy 
> function, or inline table and other things that cannot be persisted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ryanvanhuuksloot commented on a diff in pull request #21077: [FlINK-29248] Add Scala Async Retry Strategies and ResultPredicates Helper Classes

2022-10-20 Thread GitBox


ryanvanhuuksloot commented on code in PR #21077:
URL: https://github.com/apache/flink/pull/21077#discussion_r1000885089


##
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import java.util
+import java.util.function.Predicate
+
+object AsyncRetryStrategies {
+
+  class RetryPredicate[OUT](

Review Comment:
   This should be private



##
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import java.util
+import java.util.function.Predicate
+
+object AsyncRetryStrategies {
+

Review Comment:
   Can you add a `NoRetryStrategy` like the java version.



##
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import java.util
+import java.util.function.Predicate
+
+object AsyncRetryStrategies {
+
+  class RetryPredicate[OUT](
+  final val resultPredicate: Option[Predicate[util.Collection[OUT]]],
+  final val exceptionPredicate: Option[Predicate[Throwable]])
+extends AsyncRetryPredicate[OUT]
+
+  @SerialVersionUID(1L)
+  class FixedDelayRetryStrategy[OUT](
+  private val maxAttempts: Int,
+  private val backoffTimeMillis: Long,
+  private val resultPredicate: Option[Predicate[util.Collection[OUT]]],
+  private val exceptionPredicate: Option[Predicate[Throwable]]
+  ) extends AsyncRetryStrategy[OUT] {
+override def canRetry(currentAttempts: Int): Boolean = currentAttempts <= 
maxAttempts
+
+override def getBackoffTimeMillis(currentAttempts: Int): Long = {
+  backoffTimeMillis
+}
+
+override def getRetryPredicate(): AsyncRetryPredicate[OUT] =
+  new RetryPredicate(resultPredicate, exceptionPredicate)
+  }
+
+  @SerialVersionUID(1L)
+  class FixedDelayRetryStrategyBuilder[OUT](
+  private val maxAttempts: Int,
+  private val backoffTimeMillis: Long
+  ) {
+private var resultRetryPredicate: Option[Predicate[util.Collection[OUT]]] 
= None

Review Comment:
   Add argument checking like java (MaxAttempts and backoffTimeMillis > 0)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific com

[GitHub] [flink] ferenc-csaky commented on a diff in pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"

2022-10-20 Thread GitBox


ferenc-csaky commented on code in PR #21127:
URL: https://github.com/apache/flink/pull/21127#discussion_r1000885230


##
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java:
##
@@ -28,21 +28,100 @@
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the LIST command. */
 class CliFrontendListTest extends CliFrontendTestBase {
 
+private static final List TRICKY_START_TIMES =
+Arrays.asList(
+1664177946934L,

Review Comment:
   I played a bit with the test data (randomly), but it passes pretty quick, so 
I just included the whole faulty dataset as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ferenc-csaky commented on pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"

2022-10-20 Thread GitBox


ferenc-csaky commented on PR #21127:
URL: https://github.com/apache/flink/pull/21127#issuecomment-1285847522

   > I don't get how this can be JDK-specific. Probably related to overflows, 
no?
   > 
   > Is the test replicating the exact case you ran into?
   
   TBH I'm not sure anymore :) I did not debug it thoroughly, because of the 
easy fix. I went through 
[this](https://stackoverflow.com/questions/31274578/java-lang-illegalargumentexception-comparison-method-violates-its-general-contr)
 thread. It is not the same thing that happened here, there were mixed objects 
in the list in those cases and it was explicitly mentioned it works with the 
same object types.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ferenc-csaky commented on a diff in pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"

2022-10-20 Thread GitBox


ferenc-csaky commented on code in PR #21127:
URL: https://github.com/apache/flink/pull/21127#discussion_r1000864000


##
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java:
##
@@ -28,21 +28,100 @@
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the LIST command. */
 class CliFrontendListTest extends CliFrontendTestBase {
 
+private static final List TRICKY_START_TIMES =
+Arrays.asList(
+1664177946934L,

Review Comment:
   There were another similar issue I found: 
https://github.com/apache/flink/pull/5878/files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #21088: [FLINK-29582][state-processor] Relax 1+ operator requirement

2022-10-20 Thread GitBox


zentol commented on PR #21088:
URL: https://github.com/apache/flink/pull/21088#issuecomment-1285836764

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

2022-10-20 Thread Yordan Pavlov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621220#comment-17621220
 ] 

Yordan Pavlov commented on FLINK-16419:
---

[~martijnvisser] This issue has plagued our Flink setup for quite some time now 
under different versions. We are struggling to use Savepoints with Kafka EOS, 
could you elaborate on what our options are so that Savepoints work as recovery 
mechanism for unlimited time into the future? I would restate my previous 
questions:
 # Can we force Flink not to leave open transactions by triggering 'savepoint 
and stop' mechanism. Meaning stop the job with a savepoint, is this a good 
approach.
 # If above is 'yes', can we force Flink to generate the same type of savepoint 
without exiting and having to then recover the state?
 # Are there any alternatives on how we can backup our progress periodically 
and use Kafka EOS

Any help is highly appriciated?

> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> 
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Reporter: Jun Qin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ferenc-csaky commented on a diff in pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"

2022-10-20 Thread GitBox


ferenc-csaky commented on code in PR #21127:
URL: https://github.com/apache/flink/pull/21127#discussion_r1000851448


##
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java:
##
@@ -28,21 +28,100 @@
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the LIST command. */
 class CliFrontendListTest extends CliFrontendTestBase {
 
+private static final List TRICKY_START_TIMES =
+Arrays.asList(
+1664177946934L,

Review Comment:
   This issue requires at least 32 items to occur. I do not like this either, 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-20 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1000808803


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/util/PrimaryKeyBuilder.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.util;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.dynamodb.sink.InvalidRequestException;
+import org.apache.flink.util.CollectionUtil;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** Helper to construct primary(composite) key for a DynamoDB request. */
+@Internal
+public class PrimaryKeyBuilder {
+
+public static String build(List pKeys, WriteRequest request) {
+
+if (CollectionUtil.isNullOrEmpty(pKeys)) {
+// fake key, because no dynamodb partition key or sort key 
provided. Using UUID should be safe
+// here, as we have at most 25 elements per batch
+return UUID.randomUUID().toString();
+} else {
+Map requestItems = 
getRequestItems(request);
+
+StringBuilder builder = new StringBuilder();
+for (String keyName : pKeys) {
+AttributeValue value = requestItems.get(keyName);
+
+if (value == null) {
+throw new InvalidRequestException(
+"Request " + request.toString() + " does not 
contain pKey " + keyName);
+}
+
+builder.append(getKeyValue(value));

Review Comment:
   very good point. Fixed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-20 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1000808138


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Builder to construct {@link DynamoDbSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
DynamoDbSink} that writes
+ * records into DynamoDb
+ *
+ * {@code
+ * private static class DummyDynamoDbElementConverter implements 
ElementConverter {
+ *
+ * @Override
+ * public DynamoDbWriteRequest apply(String s) {
+ * final Map item = new HashMap<>();
+ * item.put("your-key", AttributeValue.builder().s(s).build());
+ * return new DynamoDbWriteRequest(
+ *   WriteRequest.builder()
+ *   .putRequest(PutRequest.builder()
+ *   .item(item)
+ *   .build())
+ *   .build()
+ *   );
+ * }
+ * }
+ * DynamoDbSink dynamoDbSink = DynamoDbSink.builder()
+ *  .setElementConverter(new 
DummyDynamoDbElementConverter())
+ *  
.setDestinationTableName("your-table-name")
+ *   .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 25
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 1
+ *   {@code maxBatchSizeInBytes} will be 16 MB i.e. {@code 16 * 1000 * 
1000}
+ *   {@code maxTimeInBufferMS} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 400 KB i.e. {@code 400 * 1000}
+ *   {@code failOnError} will be false
+ *   {@code destinationTableName} destination table for the sink
+ *   {@code overwriteByPKeys} will be empty meaning no records 
deduplication will be performed
+ *   by the batch sink
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class DynamoDbSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 25;
+private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 1;
+private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 16 * 1000 * 1000;
+private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 400 * 1000;
+private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+
+private boolean failOnError;
+private Properties dynamodbClientProperties;
+
+private ElementConverter elementConverter;
+private String tableName;
+
+private List overwriteByPKeys;
+
+public DynamoDbSinkBuilder setDynamoDbProperties(Properties 
properties) {
+this.dynamodbClientProperties = properties;
+return this;
+}
+
+public DynamoDbSinkBuilder setElementConverter(
+ElementConverter elementConverter) {
+this.elementConverter = elementConverter;
+return this;
+}
+
+/** Destination DynamoDB table name for the sink to write to. */
+public DynamoDbSinkBuilder setDestinationTableName(String 
tableName) {
+this.tableName = tableName;
+return this;
+}
+
+/**
+ * @param overwriteByPKeys provide partition key and (optionally) sort key 
name if you want to
+ * bypass no duplication limitation of single batch write request. 
Batching DynamoDB sink
+ * will drop request items in the buffer if their primary

[GitHub] [flink] xinbinhuang commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph

2022-10-20 Thread GitBox


xinbinhuang commented on PR #17873:
URL: https://github.com/apache/flink/pull/17873#issuecomment-1285811478

   @godfreyhe @KarmaGYZ @MartijnVisser 👋  can we merge this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ericxiao251 commented on pull request #21077: [FlINK-29248] Add Scala Async Retry Strategies and ResultPredicates Helper Classes

2022-10-20 Thread GitBox


ericxiao251 commented on PR #21077:
URL: https://github.com/apache/flink/pull/21077#issuecomment-1285809373

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29707) Fix possible comparator violation for "flink list"

2022-10-20 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-29707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi reassigned FLINK-29707:
--

Assignee: Ferenc Csaky

> Fix possible comparator violation for "flink list"
> --
>
> Key: FLINK-29707
> URL: https://issues.apache.org/jira/browse/FLINK-29707
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.16.0
>Reporter: Ferenc Csaky
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
>
> For the {{list}} CLI option, the code that prints the jobs, there is a 
> {{startTimeComparator}} definition, which orders the jobs and it is done this 
> way:
> {code:java}
> Comparator startTimeComparator =
> (o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime());
> {code}
> In some rare situation this can lead to this:
> {code:java}
> 2022-10-19 09:58:11,690 ERROR org.apache.flink.client.cli.CliFrontend 
>  [] - Error while running the command.
> java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!
>   at java.util.TimSort.mergeLo(TimSort.java:777) ~[?:1.8.0_312]
>   at java.util.TimSort.mergeAt(TimSort.java:514) ~[?:1.8.0_312]
>   at java.util.TimSort.mergeForceCollapse(TimSort.java:457) ~[?:1.8.0_312]
>   at java.util.TimSort.sort(TimSort.java:254) ~[?:1.8.0_312]
>   at java.util.Arrays.sort(Arrays.java:1512) ~[?:1.8.0_312]
>   at java.util.ArrayList.sort(ArrayList.java:1464) ~[?:1.8.0_312]
>   at java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:392) 
> ~[?:1.8.0_312]
>   at java.util.stream.Sink$ChainedReference.end(Sink.java:258) 
> ~[?:1.8.0_312]
>   at java.util.stream.Sink$ChainedReference.end(Sink.java:258) 
> ~[?:1.8.0_312]
>   at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:363) 
> ~[?:1.8.0_312]
>   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483) 
> ~[?:1.8.0_312]
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_312]
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
> ~[?:1.8.0_312]
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>  ~[?:1.8.0_312]
>   at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
> ~[?:1.8.0_312]
>   at 
> java.util.stream.ReferencePipeline.forEachOrdered(ReferencePipeline.java:490) 
> ~[?:1.8.0_312]
>   at 
> org.apache.flink.client.cli.CliFrontend.printJobStatusMessages(CliFrontend.java:574)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-20 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1000815682


##
flink-connector-dynamodb/src/test/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilderTest.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.util.Map;
+
+/** Tests for {@link DynamoDbSinkBuilder}. */
+public class DynamoDbSinkBuilderTest {
+
+@Test

Review Comment:
   Didn't see that changed. Thanks, I'll migrate it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-20 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1000814379


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbWriteRequest.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Represents a single DynamoDb {@link WriteRequest}. TODO remove this class, 
replace with interface
+ * to support batch and non-batch modes
+ */
+@Internal
+public class DynamoDbWriteRequest implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+private final WriteRequest writeRequest;
+
+public DynamoDbWriteRequest(WriteRequest writeRequest) {
+this.writeRequest = writeRequest;
+}
+
+public WriteRequest getWriteRequest() {
+return writeRequest;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+DynamoDbWriteRequest that = (DynamoDbWriteRequest) o;
+return Objects.equals(writeRequest, that.writeRequest);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(writeRequest);
+}

Review Comment:
   I left this class to remove later when we migrate to the interface approach 
(@nirtsruya) is working on that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   >