[jira] [Created] (FLINK-24069) IgnoreInFlightDataITCase.testIgnoreInFlightDataDuringRecovery hangs on azure
Xintong Song created FLINK-24069: Summary: IgnoreInFlightDataITCase.testIgnoreInFlightDataDuringRecovery hangs on azure Key: FLINK-24069 URL: https://issues.apache.org/jira/browse/FLINK-24069 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Xintong Song Fix For: 1.14.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23144&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=7b25afdf-cc6c-566f-5459-359dc2585798&l=12083 {code} "main" #1 prio=5 os_prio=0 tid=0x7f90bc00b800 nid=0x86d54 waiting on condition [0x7f90c4e53000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x80f25d48> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1937) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1917) at org.apache.flink.test.checkpointing.IgnoreInFlightDataITCase.executeIgnoreInFlightDataDuringRecovery(IgnoreInFlightDataITCase.java:136) at org.apache.flink.test.checkpointing.IgnoreInFlightDataITCase.testIgnoreInFlightDataDuringRecovery(IgnoreInFlightDataITCase.java:107) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24068) CheckpointBarrierHandler may skip the markAlignmentStart for alignment -with-timeout checkpoint
Yun Gao created FLINK-24068: --- Summary: CheckpointBarrierHandler may skip the markAlignmentStart for alignment -with-timeout checkpoint Key: FLINK-24068 URL: https://issues.apache.org/jira/browse/FLINK-24068 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Yun Gao {code:java} 04:51:11,727 [Flat Map -> Sink: Unnamed (10/12)#1] WARN org.apache.flink.runtime.taskmanager.Task [] - Flat Map -> Sink: Unnamed (10/12)#1 (0d965fd3c0de11dc7fb6793435cda8ba) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Alignment time is less than zero({}). Is the time monotonic? [-9223369873401849363] at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:181) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:177) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:253) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:240) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490) {code} This is caused due to: # SingleCheckpointBarrierHandler first receive a BarrierAnnouncement, which would call checkNewCheckpoint to reset the currentCheckpointId and the alignedChannels. # SingleCheckpointBarrierHandler then received an EndOfPartition, which would add the channel to the alignedChannels. # SingleCheckpointBarrierHandler then received a barrier, which would found that the alignedChannels is already not empty, then it would skip the markAlignmentStart. We might change 3 to judge if this is the first barrier to receive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24067) CheckpointBarrierHandler may skip the markAlignmentStart for alignment -with-timeout checkpoint
Yun Gao created FLINK-24067: --- Summary: CheckpointBarrierHandler may skip the markAlignmentStart for alignment -with-timeout checkpoint Key: FLINK-24067 URL: https://issues.apache.org/jira/browse/FLINK-24067 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Yun Gao {code:java} 04:51:11,727 [Flat Map -> Sink: Unnamed (10/12)#1] WARN org.apache.flink.runtime.taskmanager.Task[] - Flat Map -> Sink: Unnamed (10/12)#1 (0d965fd3c0de11dc7fb6793435cda8ba) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Alignment time is less than zero({}). Is the time monotonic? [-9223369873401849363] at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:181) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:177) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:253) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:240) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490) {code} This is caused due to: 1. SingleCheckpointBarrierHandler first receive a BarrierAnnouncement, which would call checkNewCheckpoint to reset the currentCheckpointId and the alignedChannels. 2. SingleCheckpointBarrierHandler then received an EndOfPartition, which would add the channel to the alignedChannels. 3. SingleCheckpointBarrierHandler then received a barrier, which would found that the alignedChannels is already not empty, then it would skip the markAlignmentStart. We might change 3 to judge if this is the first barrier to receive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24066) Provides a new stop entry for Kubernetes session mode
liuzhuo created FLINK-24066: --- Summary: Provides a new stop entry for Kubernetes session mode Key: FLINK-24066 URL: https://issues.apache.org/jira/browse/FLINK-24066 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Reporter: liuzhuo For the current Native Kubernetes session mode, the way to stop a session is: {code:java} # (3) Stop Kubernetes session by deleting cluster deployment $ kubectl delete deployment/my-first-flink-cluster {code} or {code:java} $ echo 'stop' | ./bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=my-first-flink-cluster \ -Dexecution.attached=true {code} I think a more friendly interface should be added to stop the session mode, such as: {code:java} $ ./bin/kubernetes-session.sh stop -Dkubernetes.cluster-id=my-first-flink-cluster {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24065) Upgrade the TwoPhaseCommitSink to support empty transaction after finished
Yun Gao created FLINK-24065: --- Summary: Upgrade the TwoPhaseCommitSink to support empty transaction after finished Key: FLINK-24065 URL: https://issues.apache.org/jira/browse/FLINK-24065 Project: Flink Issue Type: Sub-task Components: Connectors / Common, Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Yun Gao In https://issues.apache.org/jira/browse/FLINK-23473 for the TwoPhaseCommitSink, we would not create new transactions after finished to avoid we have transactions left after job finished. However, since with the current implementation of the TwoPhaseCommitSink, we would have to write the transactions into the state for each checkpoint, and the state does not support null transaction now, thus there would be NullPointerException in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24064) HybridSource recovery from savepoint fails
Thomas Weise created FLINK-24064: Summary: HybridSource recovery from savepoint fails Key: FLINK-24064 URL: https://issues.apache.org/jira/browse/FLINK-24064 Project: Flink Issue Type: Bug Components: Connectors / Common Reporter: Thomas Weise Assignee: Thomas Weise Recovery fails because underlying source and split deserializers are not initialized in the restore code path. This requires deferred deserialization after the current source has been set. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Support decimal types with larger precisions
Hi Jark and Jingsong, Thanks for your reply! Since modifying the SQL type system needs a lot of work, I agree that we should postpone this until we get more requests from users. For my own case, according to the domain knowledge, I think a precision of 38 would be enough (though the fields were declared without any precision constraints). A user-defined numeric type converter would solve the problem! Thanks, Xingcan On Mon, Aug 30, 2021 at 11:46 PM Jingsong Li wrote: > Hi Xingcan, > > As a workaround, can we convert large decimal to varchar? > > If Flink SQL wants to support large decimal, we should investigate > other big data and databases. As Jark said, this needs a lot of work. > > Best, > Jingsong Lee > > On Tue, Aug 31, 2021 at 11:16 AM Jark Wu wrote: > > > > Hi Xingcan, Timo, > > > > Yes, flink-cdc-connector and JDBC connector also don't support larger > > precision or no precision. > > However, we didn't receive any users reporting this problem. > > Maybe it is not very common that precision is higher than 38 or without > > precision. > > > > I think it makes sense to support this use case, but this definitely > needs > > a lot of work, > > and we need more investigation and discussion (maybe a new type?) > > > > Best, > > Jark > > > > > > On Mon, 30 Aug 2021 at 23:32, Xingcan Cui wrote: > > > > > Hi Timo, > > > > > > Though it's an extreme case, I still think this is a hard blocker if we > > > would ingest data from an RDBMS (and other systems supporting large > > > precision numbers). > > > > > > The tricky part is that users can declare numeric types without any > > > precision and scale restrictions in RDBMS (e.g., NUMBER in Oracle[1]), > but > > > in Flink, we must explicitly specify the precision and scale. > > > > > > Cc Jark, do you think this is a problem for flink-cdc-connectors? > > > > > > Best, > > > Xingcan > > > > > > [1] > > > > https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313 > > > > > > On Mon, Aug 30, 2021 at 4:12 AM Timo Walther > wrote: > > > > > >> Hi Xingcan, > > >> > > >> in theory there should be no hard blocker for supporting this. The > > >> implementation should be flexible enough at most locations. We just > > >> adopted 38 from the Blink code base which adopted it from Hive. > > >> > > >> However, this could be a breaking change for existing pipelines and we > > >> would need to offer a flag to bring back the old behavior. It would > > >> definitely lead to a lot of testing work to not cause inconsistencies. > > >> > > >> Do you think this is a hard blocker for users? > > >> > > >> Regards, > > >> Timo > > >> > > >> > > >> On 28.08.21 00:21, Xingcan Cui wrote: > > >> > Hi all, > > >> > > > >> > Recently, I was trying to load some CDC data from Oracle/Postgres > > >> databases > > >> > and found that the current precision range [1, 38] for DecimalType > may > > >> not > > >> > meet the requirement for some source types. For instance, in > Oracle, if > > >> a > > >> > column is declared as `NUMBER` without precision and scale, the > values > > >> in > > >> > it could potentially be very large. As DecimalType is backed by Java > > >> > BigDecimal, I wonder if we should extend the precision range. > > >> > > > >> > Best, > > >> > Xingcan > > >> > > > >> > > >> > > > > -- > Best, Jingsong Lee >
[jira] [Created] (FLINK-24063) Reconsider the behavior of ClusterEntrypoint#startCluster failure handler
Aitozi created FLINK-24063: -- Summary: Reconsider the behavior of ClusterEntrypoint#startCluster failure handler Key: FLINK-24063 URL: https://issues.apache.org/jira/browse/FLINK-24063 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Aitozi If the job runCluster failed, it will trigger the STOP_APPLICATION behavior. But if we consider a case like that: # The JobManager encounter a fatal error like the network problem, which may let the jobManager process down # Then a new process will be started by the resource framework like yarn or kubernetes. But it will failed at the ClusterEntrypoint#startCluster due to the same network problem. # Then the job turn into the FAILED status. This means a streaming job will no longer run due to some fatal error, this is somehow fragile. I think we should give some retry mechanism to prevent the job fast fail twice ,so that deal with some external error which may keep for a period of time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24062) Exception encountered during timer serialization in Python DataStream API
Dian Fu created FLINK-24062: --- Summary: Exception encountered during timer serialization in Python DataStream API Key: FLINK-24062 URL: https://issues.apache.org/jira/browse/FLINK-24062 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 For the following example: {code} # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging import sys from pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy) word_count_data = ["To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--'tis a consummation", "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there's the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, the law's delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover'd country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember'd."] def word_count(input_path, output_path): env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.BATCH) # write all the data to one file env.set_parallelism(1) # define the source if input_path is not None: ds = env.from_source( source=FileSource.for_record_stream_format(StreamFormat.text_line_format(), input_path) .process_static_file_set().build(), watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name="file_source" ) else: print("Executing word_count example with default input data set.") print("Use --input to specify file input.") ds = env.from_collection(word_count_data) def split(line): yield from line.split()
Re: Support decimal types with larger precisions
Hi Xingcan, As a workaround, can we convert large decimal to varchar? If Flink SQL wants to support large decimal, we should investigate other big data and databases. As Jark said, this needs a lot of work. Best, Jingsong Lee On Tue, Aug 31, 2021 at 11:16 AM Jark Wu wrote: > > Hi Xingcan, Timo, > > Yes, flink-cdc-connector and JDBC connector also don't support larger > precision or no precision. > However, we didn't receive any users reporting this problem. > Maybe it is not very common that precision is higher than 38 or without > precision. > > I think it makes sense to support this use case, but this definitely needs > a lot of work, > and we need more investigation and discussion (maybe a new type?) > > Best, > Jark > > > On Mon, 30 Aug 2021 at 23:32, Xingcan Cui wrote: > > > Hi Timo, > > > > Though it's an extreme case, I still think this is a hard blocker if we > > would ingest data from an RDBMS (and other systems supporting large > > precision numbers). > > > > The tricky part is that users can declare numeric types without any > > precision and scale restrictions in RDBMS (e.g., NUMBER in Oracle[1]), but > > in Flink, we must explicitly specify the precision and scale. > > > > Cc Jark, do you think this is a problem for flink-cdc-connectors? > > > > Best, > > Xingcan > > > > [1] > > https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313 > > > > On Mon, Aug 30, 2021 at 4:12 AM Timo Walther wrote: > > > >> Hi Xingcan, > >> > >> in theory there should be no hard blocker for supporting this. The > >> implementation should be flexible enough at most locations. We just > >> adopted 38 from the Blink code base which adopted it from Hive. > >> > >> However, this could be a breaking change for existing pipelines and we > >> would need to offer a flag to bring back the old behavior. It would > >> definitely lead to a lot of testing work to not cause inconsistencies. > >> > >> Do you think this is a hard blocker for users? > >> > >> Regards, > >> Timo > >> > >> > >> On 28.08.21 00:21, Xingcan Cui wrote: > >> > Hi all, > >> > > >> > Recently, I was trying to load some CDC data from Oracle/Postgres > >> databases > >> > and found that the current precision range [1, 38] for DecimalType may > >> not > >> > meet the requirement for some source types. For instance, in Oracle, if > >> a > >> > column is declared as `NUMBER` without precision and scale, the values > >> in > >> > it could potentially be very large. As DecimalType is backed by Java > >> > BigDecimal, I wonder if we should extend the precision range. > >> > > >> > Best, > >> > Xingcan > >> > > >> > >> -- Best, Jingsong Lee
[jira] [Created] (FLINK-24061) RMQSourceITCase.testAckFailure fails on azure
Xintong Song created FLINK-24061: Summary: RMQSourceITCase.testAckFailure fails on azure Key: FLINK-24061 URL: https://issues.apache.org/jira/browse/FLINK-24061 Project: Flink Issue Type: Bug Components: Connectors/ RabbitMQ Affects Versions: 1.15.0 Reporter: Xintong Song Fix For: 1.15.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23141&view=logs&j=ba53eb01-1462-56a3-8e98-0dd97fbcaab5&t=2e426bf0-b717-56bb-ab62-d63086457354&l=14514 {code} Aug 30 22:30:34 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 93.936 s <<< FAILURE! - in org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase Aug 30 22:30:34 [ERROR] testAckFailure Time elapsed: 61.713 s <<< ERROR! Aug 30 22:30:34 java.util.concurrent.TimeoutException: Condition was not met in given timeout. Aug 30 22:30:34 at org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:158) Aug 30 22:30:34 at org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:136) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Support decimal types with larger precisions
Hi Xingcan, Timo, Yes, flink-cdc-connector and JDBC connector also don't support larger precision or no precision. However, we didn't receive any users reporting this problem. Maybe it is not very common that precision is higher than 38 or without precision. I think it makes sense to support this use case, but this definitely needs a lot of work, and we need more investigation and discussion (maybe a new type?) Best, Jark On Mon, 30 Aug 2021 at 23:32, Xingcan Cui wrote: > Hi Timo, > > Though it's an extreme case, I still think this is a hard blocker if we > would ingest data from an RDBMS (and other systems supporting large > precision numbers). > > The tricky part is that users can declare numeric types without any > precision and scale restrictions in RDBMS (e.g., NUMBER in Oracle[1]), but > in Flink, we must explicitly specify the precision and scale. > > Cc Jark, do you think this is a problem for flink-cdc-connectors? > > Best, > Xingcan > > [1] > https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313 > > On Mon, Aug 30, 2021 at 4:12 AM Timo Walther wrote: > >> Hi Xingcan, >> >> in theory there should be no hard blocker for supporting this. The >> implementation should be flexible enough at most locations. We just >> adopted 38 from the Blink code base which adopted it from Hive. >> >> However, this could be a breaking change for existing pipelines and we >> would need to offer a flag to bring back the old behavior. It would >> definitely lead to a lot of testing work to not cause inconsistencies. >> >> Do you think this is a hard blocker for users? >> >> Regards, >> Timo >> >> >> On 28.08.21 00:21, Xingcan Cui wrote: >> > Hi all, >> > >> > Recently, I was trying to load some CDC data from Oracle/Postgres >> databases >> > and found that the current precision range [1, 38] for DecimalType may >> not >> > meet the requirement for some source types. For instance, in Oracle, if >> a >> > column is declared as `NUMBER` without precision and scale, the values >> in >> > it could potentially be very large. As DecimalType is backed by Java >> > BigDecimal, I wonder if we should extend the precision range. >> > >> > Best, >> > Xingcan >> > >> >>
[jira] [Created] (FLINK-24060) Move ZooKeeperUtilTest to right place
Aitozi created FLINK-24060: -- Summary: Move ZooKeeperUtilTest to right place Key: FLINK-24060 URL: https://issues.apache.org/jira/browse/FLINK-24060 Project: Flink Issue Type: Technical Debt Components: Tests Reporter: Aitozi -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24059) SourceReaderTestBase should allow NUM_SPLITS to be overridden in implementation
Brian Zhou created FLINK-24059: -- Summary: SourceReaderTestBase should allow NUM_SPLITS to be overridden in implementation Key: FLINK-24059 URL: https://issues.apache.org/jira/browse/FLINK-24059 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.13.2 Reporter: Brian Zhou Fix For: 1.14.0 Pravega Flink connector is trying to implement the FLIP-27 sources and trying to map the Pravega reader into the split. This leads to a one-to-one mapping for source reader and splits. For unit tests, Flink has offered the {{SourceReaderTestBase}} class to test more easily, but it has a {{final}} constraint in the NUM_SPLITS constant which the value is 10, which makes us hard to integrate. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24058) TaskSlotTableImplTest.testMarkSlotActiveDeactivatesSlotTimeout fails on azure
Xintong Song created FLINK-24058: Summary: TaskSlotTableImplTest.testMarkSlotActiveDeactivatesSlotTimeout fails on azure Key: FLINK-24058 URL: https://issues.apache.org/jira/browse/FLINK-24058 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Xintong Song Fix For: 1.15.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23083&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=8583 {code} Aug 30 10:55:16 [ERROR] Tests run: 20, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.243 s <<< FAILURE! - in org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImplTest Aug 30 10:55:16 [ERROR] testMarkSlotActiveDeactivatesSlotTimeout Time elapsed: 0.334 s <<< FAILURE! Aug 30 10:55:16 java.lang.AssertionError: The slot timeout should have been deactivated. Aug 30 10:55:16 at org.junit.Assert.fail(Assert.java:89) Aug 30 10:55:16 at org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImplTest.runDeactivateSlotTimeoutTest(TaskSlotTableImplTest.java:509) Aug 30 10:55:16 at org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImplTest.testMarkSlotActiveDeactivatesSlotTimeout(TaskSlotTableImplTest.java:472) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24057) Flink SQL client Hadoop is not in the classpath/dependencies error even thugh Hadoop S3 File system plugin was added
James Kim created FLINK-24057: - Summary: Flink SQL client Hadoop is not in the classpath/dependencies error even thugh Hadoop S3 File system plugin was added Key: FLINK-24057 URL: https://issues.apache.org/jira/browse/FLINK-24057 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Client Affects Versions: 1.13.2 Environment: VirtualBox Ubuntu 18.03 Reporter: James Kim I'm trying use a CSV file on an S3 compliant object store and query through Flink SQL client. As docs (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins) have stated, I have added the s3.access-key, s3.secret-key, s3.endpoint, and s3.path.style.access to flink-conf.yaml. However, when I ran Flink SQL, created a table with connector as filesystem, path as the s3a path and format as csv. However, when I run select * on the table, it hangs for couple minutes so I checked the logs and it gives me "INFO org.apache.flink.core.fs.FileSystem [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available". This should have been taken care of according to the docs by adding the Hadoop S3 File System plugins, however, it does not work as expected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Support decimal types with larger precisions
Hi Timo, Though it's an extreme case, I still think this is a hard blocker if we would ingest data from an RDBMS (and other systems supporting large precision numbers). The tricky part is that users can declare numeric types without any precision and scale restrictions in RDBMS (e.g., NUMBER in Oracle[1]), but in Flink, we must explicitly specify the precision and scale. Cc Jark, do you think this is a problem for flink-cdc-connectors? Best, Xingcan [1] https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313 On Mon, Aug 30, 2021 at 4:12 AM Timo Walther wrote: > Hi Xingcan, > > in theory there should be no hard blocker for supporting this. The > implementation should be flexible enough at most locations. We just > adopted 38 from the Blink code base which adopted it from Hive. > > However, this could be a breaking change for existing pipelines and we > would need to offer a flag to bring back the old behavior. It would > definitely lead to a lot of testing work to not cause inconsistencies. > > Do you think this is a hard blocker for users? > > Regards, > Timo > > > On 28.08.21 00:21, Xingcan Cui wrote: > > Hi all, > > > > Recently, I was trying to load some CDC data from Oracle/Postgres > databases > > and found that the current precision range [1, 38] for DecimalType may > not > > meet the requirement for some source types. For instance, in Oracle, if a > > column is declared as `NUMBER` without precision and scale, the values in > > it could potentially be very large. As DecimalType is backed by Java > > BigDecimal, I wonder if we should extend the precision range. > > > > Best, > > Xingcan > > > >
[jira] [Created] (FLINK-24056) Remove unused ZooKeeperUtilityFactory
Aitozi created FLINK-24056: -- Summary: Remove unused ZooKeeperUtilityFactory Key: FLINK-24056 URL: https://issues.apache.org/jira/browse/FLINK-24056 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Aitozi -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24055) Deprecate FlinkKafkaConsumer
Fabian Paul created FLINK-24055: --- Summary: Deprecate FlinkKafkaConsumer Key: FLINK-24055 URL: https://issues.apache.org/jira/browse/FLINK-24055 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul With the introduction of the KafkaSource https://issues.apache.org/jira/browse/FLINK-18323 we should deprecate the FlinkKafkaConsumer to hint users to start the migration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24054) Let SinkUpsertMaterializer emit +U instead of only +I
Timo Walther created FLINK-24054: Summary: Let SinkUpsertMaterializer emit +U instead of only +I Key: FLINK-24054 URL: https://issues.apache.org/jira/browse/FLINK-24054 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: Timo Walther Assignee: Timo Walther Currently, {{SinkUpsertMaterializer}} is not able to emit +U's but will always emit +I's. Thus, resulting changelogs are incorrect strictly speaking and only valid when treating +U and +I as similar changes in downstream operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24053) stop with savepoint timeout
刘方奇 created FLINK-24053: --- Summary: stop with savepoint timeout Key: FLINK-24053 URL: https://issues.apache.org/jira/browse/FLINK-24053 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / REST Affects Versions: 1.13.0, 1.12.0, 1.11.0 Reporter: 刘方奇 Hello, when we use the "stop with savepoint" feature, we always meet a bug. We will always cost 5 mins waiting the application to end, then the application will throw a timeout exception. {code:java} //代码占位符 java.util.concurrent.TimeoutException: nulljava.util.concurrent.TimeoutException: null at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036) ~[classes/:?] at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) ~[classes/:?] at org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$14(FutureUtils.java:445) ~[classes/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_251] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_251] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_251] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_251] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_251] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_251] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251] {code} And we found there was always the function called org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.SavepointStatusHandler.closeHandlerAsync() run timeout, and its timeout setting is 5mins. There was a question that the handler 's close may be not important, cause the handler serves other handler called org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.StopWithSavepointHandler which was already closed.So should we skip this close ? PS : There was no problem when we test the code that skip the handler 's close. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24052) Flink SQL reads S3 bucket data.
Moses created FLINK-24052: - Summary: Flink SQL reads S3 bucket data. Key: FLINK-24052 URL: https://issues.apache.org/jira/browse/FLINK-24052 Project: Flink Issue Type: Improvement Components: Table SQL / Ecosystem Reporter: Moses I wanna use Flink SQL reads S3 bucket data. But now I found it ONLY supports absolute path, which means I can not read all content in the bucket. My SQL statements write as below: {code:sql} CREATE TABLE file_data ( a BIGINT, b STRING, c STRING, d DOUBLE, e BOOLEAN, f DATE, g STRING,h STRING, i STRING, j STRING, k STRING, l STRING, m STRING, n STRING, o STRING, p FLOAT ) WITH ( 'connector' = 'filesystem', 'path' = 's3a://my-bucket', 'format' = 'parquet' ); SELECT COUNT(*) FROM file_data; {code} The exception info: {code:java} Caused by: java.lang.IllegalArgumentException: path must be absolute at com.google.common.base.Preconditions.checkArgument(Preconditions.java:88) ~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1] at org.apache.hadoop.fs.s3a.s3guard.PathMetadata.(PathMetadata.java:68) ~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1] at org.apache.hadoop.fs.s3a.s3guard.PathMetadata.(PathMetadata.java:60) ~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1] at org.apache.hadoop.fs.s3a.s3guard.PathMetadata.(PathMetadata.java:56) ~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1] at org.apache.hadoop.fs.s3a.s3guard.S3Guard.putAndReturn(S3Guard.java:149) ~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1] {code} Is there any solution to meet my requirement ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24051) Make consumer.group-id optional for KafkaSource
Fabian Paul created FLINK-24051: --- Summary: Make consumer.group-id optional for KafkaSource Key: FLINK-24051 URL: https://issues.apache.org/jira/browse/FLINK-24051 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul For most of the users it is not necessary to generate a group-id and the source itself can provide a meaningful group-id during startup. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24050) Support primary keys on metadata columns
Ingo Bürk created FLINK-24050: - Summary: Support primary keys on metadata columns Key: FLINK-24050 URL: https://issues.apache.org/jira/browse/FLINK-24050 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Ingo Bürk Currently, primary keys are required to consist solely of physical columns. However, there might be scenarios where the actual payload/records do not contain a suitable primary key, but a unique identifier is available through metadata. In this case it would make sense to define the primary key on such a metadata column: {code:java} CREATE TABLE T ( uid METADATA, content STRING PRIMARY KEY (uid) NOT ENFORCED ) WITH (…) {code} A simple example for this would be IMAP: there is nothing unique about any single email as a record, but each email in a specific folder on an IMAP server has a unique UID (I'm excluding some irrelevant technical details here). -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Do we still maintain travis build system?
I think Flink 1.10.x used Travis. So I agree with Tison's proposal. +1 for removing the "@flinkbot run travis" from the command documentation. cc @Chesnay Schepler Cheers, Till On Sun, Aug 29, 2021 at 4:48 AM tison wrote: > Hi, > > I can still see "@flinkbot run travis" in flinkbot's toast but it seems we > already migrate to azure > pipeline and this command becomes invalid? If so, let's omit it from the > toast. > > Best, > tison. >
Re: Support decimal types with larger precisions
Hi Xingcan, in theory there should be no hard blocker for supporting this. The implementation should be flexible enough at most locations. We just adopted 38 from the Blink code base which adopted it from Hive. However, this could be a breaking change for existing pipelines and we would need to offer a flag to bring back the old behavior. It would definitely lead to a lot of testing work to not cause inconsistencies. Do you think this is a hard blocker for users? Regards, Timo On 28.08.21 00:21, Xingcan Cui wrote: Hi all, Recently, I was trying to load some CDC data from Oracle/Postgres databases and found that the current precision range [1, 38] for DecimalType may not meet the requirement for some source types. For instance, in Oracle, if a column is declared as `NUMBER` without precision and scale, the values in it could potentially be very large. As DecimalType is backed by Java BigDecimal, I wonder if we should extend the precision range. Best, Xingcan