[jira] [Created] (FLINK-24069) IgnoreInFlightDataITCase.testIgnoreInFlightDataDuringRecovery hangs on azure

2021-08-30 Thread Xintong Song (Jira)
Xintong Song created FLINK-24069:


 Summary: 
IgnoreInFlightDataITCase.testIgnoreInFlightDataDuringRecovery hangs on azure
 Key: FLINK-24069
 URL: https://issues.apache.org/jira/browse/FLINK-24069
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Xintong Song
 Fix For: 1.14.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23144&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=7b25afdf-cc6c-566f-5459-359dc2585798&l=12083

{code}
"main" #1 prio=5 os_prio=0 tid=0x7f90bc00b800 nid=0x86d54 waiting on 
condition [0x7f90c4e53000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x80f25d48> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1937)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1917)
at 
org.apache.flink.test.checkpointing.IgnoreInFlightDataITCase.executeIgnoreInFlightDataDuringRecovery(IgnoreInFlightDataITCase.java:136)
at 
org.apache.flink.test.checkpointing.IgnoreInFlightDataITCase.testIgnoreInFlightDataDuringRecovery(IgnoreInFlightDataITCase.java:107)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24068) CheckpointBarrierHandler may skip the markAlignmentStart for alignment -with-timeout checkpoint

2021-08-30 Thread Yun Gao (Jira)
Yun Gao created FLINK-24068:
---

 Summary: CheckpointBarrierHandler may skip the markAlignmentStart 
for alignment -with-timeout checkpoint
 Key: FLINK-24068
 URL: https://issues.apache.org/jira/browse/FLINK-24068
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Yun Gao


{code:java}
04:51:11,727 [Flat Map -> Sink: Unnamed (10/12)#1] WARN 
org.apache.flink.runtime.taskmanager.Task [] - Flat Map -> Sink: Unnamed 
(10/12)#1 (0d965fd3c0de11dc7fb6793435cda8ba) switched from RUNNING to FAILED 
with failure cause: java.lang.IllegalStateException: Alignment time is less 
than zero({}). Is the time monotonic? [-9223369873401849363]
 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
 at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:181)
 at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:177)
 at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:253)
 at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:240)
 at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
 at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490)
{code}

This is caused due to:
# SingleCheckpointBarrierHandler first receive a BarrierAnnouncement, which 
would call checkNewCheckpoint to reset the currentCheckpointId and the 
alignedChannels.
# SingleCheckpointBarrierHandler then received an EndOfPartition, which would 
add the channel to the alignedChannels.
# SingleCheckpointBarrierHandler then received a barrier, which would found 
that the alignedChannels is already not empty, then it would skip the 
markAlignmentStart. 

We might change 3 to judge if this is the first barrier to receive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24067) CheckpointBarrierHandler may skip the markAlignmentStart for alignment -with-timeout checkpoint

2021-08-30 Thread Yun Gao (Jira)
Yun Gao created FLINK-24067:
---

 Summary: CheckpointBarrierHandler may skip the markAlignmentStart 
for alignment -with-timeout checkpoint
 Key: FLINK-24067
 URL: https://issues.apache.org/jira/browse/FLINK-24067
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Yun Gao


{code:java}
04:51:11,727 [Flat Map -> Sink: Unnamed (10/12)#1] WARN  
org.apache.flink.runtime.taskmanager.Task[] - Flat Map -> 
Sink: Unnamed (10/12)#1 (0d965fd3c0de11dc7fb6793435cda8ba) switched from 
RUNNING to FAILED with failure cause: java.lang.IllegalStateException: 
Alignment time is less than zero({}). Is the time monotonic? 
[-9223369873401849363]
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:181)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:177)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:253)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:240)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490)
{code}

This is caused due to:
1. SingleCheckpointBarrierHandler first receive a BarrierAnnouncement, which 
would call checkNewCheckpoint to reset the currentCheckpointId and the 
alignedChannels.
2. SingleCheckpointBarrierHandler then received an EndOfPartition, which would 
add the channel to the alignedChannels.
3. SingleCheckpointBarrierHandler then received a barrier, which would found 
that the alignedChannels is already not empty, then it would skip the 
markAlignmentStart. 

We might change 3 to judge if this is the first barrier to receive. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24066) Provides a new stop entry for Kubernetes session mode

2021-08-30 Thread liuzhuo (Jira)
liuzhuo created FLINK-24066:
---

 Summary: Provides a new stop entry for Kubernetes session mode
 Key: FLINK-24066
 URL: https://issues.apache.org/jira/browse/FLINK-24066
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: liuzhuo


For the current Native Kubernetes session mode, the way to stop a session is:
{code:java}
# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster
{code}
or
{code:java}
$ echo 'stop' | ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dexecution.attached=true
{code}
I think a more friendly interface should be added to stop the session mode, 
such as:
{code:java}
$ ./bin/kubernetes-session.sh  stop   
-Dkubernetes.cluster-id=my-first-flink-cluster
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24065) Upgrade the TwoPhaseCommitSink to support empty transaction after finished

2021-08-30 Thread Yun Gao (Jira)
Yun Gao created FLINK-24065:
---

 Summary: Upgrade the TwoPhaseCommitSink to support empty 
transaction after finished
 Key: FLINK-24065
 URL: https://issues.apache.org/jira/browse/FLINK-24065
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common, Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Yun Gao


In https://issues.apache.org/jira/browse/FLINK-23473 for the 
TwoPhaseCommitSink, we would not create new transactions after finished to 
avoid we have transactions left after job finished. However, since with the 
current implementation of the TwoPhaseCommitSink, we would have to write the 
transactions into the state for each checkpoint, and the state does not support 
null transaction now, thus there would be NullPointerException in this case. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24064) HybridSource recovery from savepoint fails

2021-08-30 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-24064:


 Summary: HybridSource recovery from savepoint fails
 Key: FLINK-24064
 URL: https://issues.apache.org/jira/browse/FLINK-24064
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Reporter: Thomas Weise
Assignee: Thomas Weise


Recovery fails because underlying source and split deserializers are not 
initialized in the restore code path. This requires deferred deserialization 
after the current source has been set.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support decimal types with larger precisions

2021-08-30 Thread Xingcan Cui
Hi Jark and Jingsong,

Thanks for your reply! Since modifying the SQL type system needs a lot of
work, I agree that we should postpone this until we get more requests from
users.

For my own case, according to the domain knowledge, I think a precision of
38 would be enough (though the fields were declared without any precision
constraints). A user-defined numeric type converter would solve the problem!

Thanks,
Xingcan

On Mon, Aug 30, 2021 at 11:46 PM Jingsong Li  wrote:

> Hi Xingcan,
>
> As a workaround, can we convert large decimal to varchar?
>
> If Flink SQL wants to support large decimal, we should investigate
> other big data and databases. As Jark said, this needs a lot of work.
>
> Best,
> Jingsong Lee
>
> On Tue, Aug 31, 2021 at 11:16 AM Jark Wu  wrote:
> >
> > Hi Xingcan, Timo,
> >
> > Yes, flink-cdc-connector and JDBC connector also don't support larger
> > precision or no precision.
> > However, we didn't receive any users reporting this problem.
> > Maybe it is not very common that precision is higher than 38 or without
> > precision.
> >
> > I think it makes sense to support this use case, but this definitely
> needs
> > a lot of work,
> > and we need more investigation and discussion (maybe a new type?)
> >
> > Best,
> > Jark
> >
> >
> > On Mon, 30 Aug 2021 at 23:32, Xingcan Cui  wrote:
> >
> > > Hi Timo,
> > >
> > > Though it's an extreme case, I still think this is a hard blocker if we
> > > would ingest data from an RDBMS (and other systems supporting large
> > > precision numbers).
> > >
> > > The tricky part is that users can declare numeric types without any
> > > precision and scale restrictions in RDBMS (e.g., NUMBER in Oracle[1]),
> but
> > > in Flink, we must explicitly specify the precision and scale.
> > >
> > > Cc Jark, do you think this is a problem for flink-cdc-connectors?
> > >
> > > Best,
> > > Xingcan
> > >
> > > [1]
> > >
> https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313
> > >
> > > On Mon, Aug 30, 2021 at 4:12 AM Timo Walther 
> wrote:
> > >
> > >> Hi Xingcan,
> > >>
> > >> in theory there should be no hard blocker for supporting this. The
> > >> implementation should be flexible enough at most locations. We just
> > >> adopted 38 from the Blink code base which adopted it from Hive.
> > >>
> > >> However, this could be a breaking change for existing pipelines and we
> > >> would need to offer a flag to bring back the old behavior. It would
> > >> definitely lead to a lot of testing work to not cause inconsistencies.
> > >>
> > >> Do you think this is a hard blocker for users?
> > >>
> > >> Regards,
> > >> Timo
> > >>
> > >>
> > >> On 28.08.21 00:21, Xingcan Cui wrote:
> > >> > Hi all,
> > >> >
> > >> > Recently, I was trying to load some CDC data from Oracle/Postgres
> > >> databases
> > >> > and found that the current precision range [1, 38] for DecimalType
> may
> > >> not
> > >> > meet the requirement for some source types. For instance, in
> Oracle, if
> > >> a
> > >> > column is declared as `NUMBER` without precision and scale, the
> values
> > >> in
> > >> > it could potentially be very large. As DecimalType is backed by Java
> > >> > BigDecimal, I wonder if we should extend the precision range.
> > >> >
> > >> > Best,
> > >> > Xingcan
> > >> >
> > >>
> > >>
>
>
>
> --
> Best, Jingsong Lee
>


[jira] [Created] (FLINK-24063) Reconsider the behavior of ClusterEntrypoint#startCluster failure handler

2021-08-30 Thread Aitozi (Jira)
Aitozi created FLINK-24063:
--

 Summary: Reconsider the behavior of ClusterEntrypoint#startCluster 
failure handler
 Key: FLINK-24063
 URL: https://issues.apache.org/jira/browse/FLINK-24063
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Aitozi


If the job runCluster failed, it will trigger the STOP_APPLICATION behavior. 
But if we consider a case like that:
 # The JobManager encounter a fatal error like the network problem, which may 
let the jobManager process down
 # Then a new process will be started by the resource framework like yarn or 
kubernetes. But it will failed at the ClusterEntrypoint#startCluster due to the 
same network problem. 
 # Then the job turn into the FAILED status.

 

This means  a streaming job will no longer run due to some fatal error, this is 
somehow fragile. I think we should give some retry mechanism to prevent the job 
fast fail twice ,so that deal with some external error which may keep for a 
period of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24062) Exception encountered during timer serialization in Python DataStream API

2021-08-30 Thread Dian Fu (Jira)
Dian Fu created FLINK-24062:
---

 Summary: Exception encountered during timer serialization in 
Python DataStream API 
 Key: FLINK-24062
 URL: https://issues.apache.org/jira/browse/FLINK-24062
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


For the following example:
{code}

#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import sys

from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, 
OutputFileConfig,
   RollingPolicy)


word_count_data = ["To be, or not to be,--that is the question:--",
   "Whether 'tis nobler in the mind to suffer",
   "The slings and arrows of outrageous fortune",
   "Or to take arms against a sea of troubles,",
   "And by opposing end them?--To die,--to sleep,--",
   "No more; and by a sleep to say we end",
   "The heartache, and the thousand natural shocks",
   "That flesh is heir to,--'tis a consummation",
   "Devoutly to be wish'd. To die,--to sleep;--",
   "To sleep! perchance to dream:--ay, there's the rub;",
   "For in that sleep of death what dreams may come,",
   "When we have shuffled off this mortal coil,",
   "Must give us pause: there's the respect",
   "That makes calamity of so long life;",
   "For who would bear the whips and scorns of time,",
   "The oppressor's wrong, the proud man's contumely,",
   "The pangs of despis'd love, the law's delay,",
   "The insolence of office, and the spurns",
   "That patient merit of the unworthy takes,",
   "When he himself might his quietus make",
   "With a bare bodkin? who would these fardels bear,",
   "To grunt and sweat under a weary life,",
   "But that the dread of something after death,--",
   "The undiscover'd country, from whose bourn",
   "No traveller returns,--puzzles the will,",
   "And makes us rather bear those ills we have",
   "Than fly to others that we know not of?",
   "Thus conscience does make cowards of us all;",
   "And thus the native hue of resolution",
   "Is sicklied o'er with the pale cast of thought;",
   "And enterprises of great pith and moment,",
   "With this regard, their currents turn awry,",
   "And lose the name of action.--Soft you now!",
   "The fair Ophelia!--Nymph, in thy orisons",
   "Be all my sins remember'd."]


def word_count(input_path, output_path):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# write all the data to one file
env.set_parallelism(1)

# define the source
if input_path is not None:
ds = env.from_source(

source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
   input_path)
 .process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source"
)
else:
print("Executing word_count example with default input data set.")
print("Use --input to specify file input.")
ds = env.from_collection(word_count_data)

def split(line):
yield from line.split()


Re: Support decimal types with larger precisions

2021-08-30 Thread Jingsong Li
Hi Xingcan,

As a workaround, can we convert large decimal to varchar?

If Flink SQL wants to support large decimal, we should investigate
other big data and databases. As Jark said, this needs a lot of work.

Best,
Jingsong Lee

On Tue, Aug 31, 2021 at 11:16 AM Jark Wu  wrote:
>
> Hi Xingcan, Timo,
>
> Yes, flink-cdc-connector and JDBC connector also don't support larger
> precision or no precision.
> However, we didn't receive any users reporting this problem.
> Maybe it is not very common that precision is higher than 38 or without
> precision.
>
> I think it makes sense to support this use case, but this definitely needs
> a lot of work,
> and we need more investigation and discussion (maybe a new type?)
>
> Best,
> Jark
>
>
> On Mon, 30 Aug 2021 at 23:32, Xingcan Cui  wrote:
>
> > Hi Timo,
> >
> > Though it's an extreme case, I still think this is a hard blocker if we
> > would ingest data from an RDBMS (and other systems supporting large
> > precision numbers).
> >
> > The tricky part is that users can declare numeric types without any
> > precision and scale restrictions in RDBMS (e.g., NUMBER in Oracle[1]), but
> > in Flink, we must explicitly specify the precision and scale.
> >
> > Cc Jark, do you think this is a problem for flink-cdc-connectors?
> >
> > Best,
> > Xingcan
> >
> > [1]
> > https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313
> >
> > On Mon, Aug 30, 2021 at 4:12 AM Timo Walther  wrote:
> >
> >> Hi Xingcan,
> >>
> >> in theory there should be no hard blocker for supporting this. The
> >> implementation should be flexible enough at most locations. We just
> >> adopted 38 from the Blink code base which adopted it from Hive.
> >>
> >> However, this could be a breaking change for existing pipelines and we
> >> would need to offer a flag to bring back the old behavior. It would
> >> definitely lead to a lot of testing work to not cause inconsistencies.
> >>
> >> Do you think this is a hard blocker for users?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 28.08.21 00:21, Xingcan Cui wrote:
> >> > Hi all,
> >> >
> >> > Recently, I was trying to load some CDC data from Oracle/Postgres
> >> databases
> >> > and found that the current precision range [1, 38] for DecimalType may
> >> not
> >> > meet the requirement for some source types. For instance, in Oracle, if
> >> a
> >> > column is declared as `NUMBER` without precision and scale, the values
> >> in
> >> > it could potentially be very large. As DecimalType is backed by Java
> >> > BigDecimal, I wonder if we should extend the precision range.
> >> >
> >> > Best,
> >> > Xingcan
> >> >
> >>
> >>



-- 
Best, Jingsong Lee


[jira] [Created] (FLINK-24061) RMQSourceITCase.testAckFailure fails on azure

2021-08-30 Thread Xintong Song (Jira)
Xintong Song created FLINK-24061:


 Summary: RMQSourceITCase.testAckFailure fails on azure
 Key: FLINK-24061
 URL: https://issues.apache.org/jira/browse/FLINK-24061
 Project: Flink
  Issue Type: Bug
  Components: Connectors/ RabbitMQ
Affects Versions: 1.15.0
Reporter: Xintong Song
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23141&view=logs&j=ba53eb01-1462-56a3-8e98-0dd97fbcaab5&t=2e426bf0-b717-56bb-ab62-d63086457354&l=14514

{code}
Aug 30 22:30:34 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 93.936 s <<< FAILURE! - in 
org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase
Aug 30 22:30:34 [ERROR] testAckFailure  Time elapsed: 61.713 s  <<< ERROR!
Aug 30 22:30:34 java.util.concurrent.TimeoutException: Condition was not met in 
given timeout.
Aug 30 22:30:34 at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:158)
Aug 30 22:30:34 at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:136)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support decimal types with larger precisions

2021-08-30 Thread Jark Wu
Hi Xingcan, Timo,

Yes, flink-cdc-connector and JDBC connector also don't support larger
precision or no precision.
However, we didn't receive any users reporting this problem.
Maybe it is not very common that precision is higher than 38 or without
precision.

I think it makes sense to support this use case, but this definitely needs
a lot of work,
and we need more investigation and discussion (maybe a new type?)

Best,
Jark


On Mon, 30 Aug 2021 at 23:32, Xingcan Cui  wrote:

> Hi Timo,
>
> Though it's an extreme case, I still think this is a hard blocker if we
> would ingest data from an RDBMS (and other systems supporting large
> precision numbers).
>
> The tricky part is that users can declare numeric types without any
> precision and scale restrictions in RDBMS (e.g., NUMBER in Oracle[1]), but
> in Flink, we must explicitly specify the precision and scale.
>
> Cc Jark, do you think this is a problem for flink-cdc-connectors?
>
> Best,
> Xingcan
>
> [1]
> https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313
>
> On Mon, Aug 30, 2021 at 4:12 AM Timo Walther  wrote:
>
>> Hi Xingcan,
>>
>> in theory there should be no hard blocker for supporting this. The
>> implementation should be flexible enough at most locations. We just
>> adopted 38 from the Blink code base which adopted it from Hive.
>>
>> However, this could be a breaking change for existing pipelines and we
>> would need to offer a flag to bring back the old behavior. It would
>> definitely lead to a lot of testing work to not cause inconsistencies.
>>
>> Do you think this is a hard blocker for users?
>>
>> Regards,
>> Timo
>>
>>
>> On 28.08.21 00:21, Xingcan Cui wrote:
>> > Hi all,
>> >
>> > Recently, I was trying to load some CDC data from Oracle/Postgres
>> databases
>> > and found that the current precision range [1, 38] for DecimalType may
>> not
>> > meet the requirement for some source types. For instance, in Oracle, if
>> a
>> > column is declared as `NUMBER` without precision and scale, the values
>> in
>> > it could potentially be very large. As DecimalType is backed by Java
>> > BigDecimal, I wonder if we should extend the precision range.
>> >
>> > Best,
>> > Xingcan
>> >
>>
>>


[jira] [Created] (FLINK-24060) Move ZooKeeperUtilTest to right place

2021-08-30 Thread Aitozi (Jira)
Aitozi created FLINK-24060:
--

 Summary: Move ZooKeeperUtilTest to right place
 Key: FLINK-24060
 URL: https://issues.apache.org/jira/browse/FLINK-24060
 Project: Flink
  Issue Type: Technical Debt
  Components: Tests
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24059) SourceReaderTestBase should allow NUM_SPLITS to be overridden in implementation

2021-08-30 Thread Brian Zhou (Jira)
Brian Zhou created FLINK-24059:
--

 Summary: SourceReaderTestBase should allow NUM_SPLITS to be 
overridden in implementation
 Key: FLINK-24059
 URL: https://issues.apache.org/jira/browse/FLINK-24059
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.13.2
Reporter: Brian Zhou
 Fix For: 1.14.0


Pravega Flink connector is trying to implement the FLIP-27 sources and trying 
to map the Pravega reader into the split. This leads to a one-to-one mapping 
for source reader and splits. For unit tests, Flink has offered the 
{{SourceReaderTestBase}} class to test more easily, but it has a {{final}} 
constraint in the NUM_SPLITS constant which the value is 10, which makes us 
hard to integrate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24058) TaskSlotTableImplTest.testMarkSlotActiveDeactivatesSlotTimeout fails on azure

2021-08-30 Thread Xintong Song (Jira)
Xintong Song created FLINK-24058:


 Summary: 
TaskSlotTableImplTest.testMarkSlotActiveDeactivatesSlotTimeout fails on azure
 Key: FLINK-24058
 URL: https://issues.apache.org/jira/browse/FLINK-24058
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Xintong Song
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23083&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=8583

{code}
Aug 30 10:55:16 [ERROR] Tests run: 20, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 1.243 s <<< FAILURE! - in 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImplTest
Aug 30 10:55:16 [ERROR] testMarkSlotActiveDeactivatesSlotTimeout  Time elapsed: 
0.334 s  <<< FAILURE!
Aug 30 10:55:16 java.lang.AssertionError: The slot timeout should have been 
deactivated.
Aug 30 10:55:16 at org.junit.Assert.fail(Assert.java:89)
Aug 30 10:55:16 at 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImplTest.runDeactivateSlotTimeoutTest(TaskSlotTableImplTest.java:509)
Aug 30 10:55:16 at 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImplTest.testMarkSlotActiveDeactivatesSlotTimeout(TaskSlotTableImplTest.java:472)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24057) Flink SQL client Hadoop is not in the classpath/dependencies error even thugh Hadoop S3 File system plugin was added

2021-08-30 Thread James Kim (Jira)
James Kim created FLINK-24057:
-

 Summary: Flink SQL client Hadoop is not in the 
classpath/dependencies error even thugh Hadoop S3 File system plugin was added
 Key: FLINK-24057
 URL: https://issues.apache.org/jira/browse/FLINK-24057
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Client
Affects Versions: 1.13.2
 Environment: VirtualBox Ubuntu 18.03 
Reporter: James Kim


I'm trying use a CSV file on an S3 compliant object store and query through 
Flink SQL client.

As docs 
(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins)
 have stated, I have added the s3.access-key, s3.secret-key, s3.endpoint, and 
s3.path.style.access to flink-conf.yaml.

 

However, when I ran Flink SQL, created a table with  connector as filesystem, 
path as the s3a path and format as csv.

 

However, when I run select * on the table, it hangs for couple minutes so I 
checked the logs and it gives me "INFO org.apache.flink.core.fs.FileSystem [] - 
Hadoop is not in the classpath/dependencies. The extended set of supported File 
Systems via Hadoop is not available". This should have been taken care of 
according to the docs by adding the Hadoop S3 File System plugins, however, it 
does not work as expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support decimal types with larger precisions

2021-08-30 Thread Xingcan Cui
Hi Timo,

Though it's an extreme case, I still think this is a hard blocker if we
would ingest data from an RDBMS (and other systems supporting large
precision numbers).

The tricky part is that users can declare numeric types without any
precision and scale restrictions in RDBMS (e.g., NUMBER in Oracle[1]), but
in Flink, we must explicitly specify the precision and scale.

Cc Jark, do you think this is a problem for flink-cdc-connectors?

Best,
Xingcan

[1]
https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313

On Mon, Aug 30, 2021 at 4:12 AM Timo Walther  wrote:

> Hi Xingcan,
>
> in theory there should be no hard blocker for supporting this. The
> implementation should be flexible enough at most locations. We just
> adopted 38 from the Blink code base which adopted it from Hive.
>
> However, this could be a breaking change for existing pipelines and we
> would need to offer a flag to bring back the old behavior. It would
> definitely lead to a lot of testing work to not cause inconsistencies.
>
> Do you think this is a hard blocker for users?
>
> Regards,
> Timo
>
>
> On 28.08.21 00:21, Xingcan Cui wrote:
> > Hi all,
> >
> > Recently, I was trying to load some CDC data from Oracle/Postgres
> databases
> > and found that the current precision range [1, 38] for DecimalType may
> not
> > meet the requirement for some source types. For instance, in Oracle, if a
> > column is declared as `NUMBER` without precision and scale, the values in
> > it could potentially be very large. As DecimalType is backed by Java
> > BigDecimal, I wonder if we should extend the precision range.
> >
> > Best,
> > Xingcan
> >
>
>


[jira] [Created] (FLINK-24056) Remove unused ZooKeeperUtilityFactory

2021-08-30 Thread Aitozi (Jira)
Aitozi created FLINK-24056:
--

 Summary: Remove unused ZooKeeperUtilityFactory
 Key: FLINK-24056
 URL: https://issues.apache.org/jira/browse/FLINK-24056
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24055) Deprecate FlinkKafkaConsumer

2021-08-30 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24055:
---

 Summary: Deprecate FlinkKafkaConsumer
 Key: FLINK-24055
 URL: https://issues.apache.org/jira/browse/FLINK-24055
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


With the introduction of the KafkaSource 
https://issues.apache.org/jira/browse/FLINK-18323 we should deprecate the 
FlinkKafkaConsumer to hint users to start the migration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24054) Let SinkUpsertMaterializer emit +U instead of only +I

2021-08-30 Thread Timo Walther (Jira)
Timo Walther created FLINK-24054:


 Summary: Let SinkUpsertMaterializer emit +U instead of only +I
 Key: FLINK-24054
 URL: https://issues.apache.org/jira/browse/FLINK-24054
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Timo Walther
Assignee: Timo Walther


Currently, {{SinkUpsertMaterializer}} is not able to emit +U's but will always 
emit +I's. Thus, resulting changelogs are incorrect strictly speaking and only 
valid when treating +U and +I as similar changes in downstream operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24053) stop with savepoint timeout

2021-08-30 Thread Jira
刘方奇 created FLINK-24053:
---

 Summary: stop with savepoint timeout
 Key: FLINK-24053
 URL: https://issues.apache.org/jira/browse/FLINK-24053
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / REST
Affects Versions: 1.13.0, 1.12.0, 1.11.0
Reporter: 刘方奇


Hello, when we use the "stop with savepoint" feature, we always meet a bug.

We will always cost 5 mins waiting the application to end, then the application 
will throw a timeout exception.

 
{code:java}
//代码占位符
java.util.concurrent.TimeoutException: 
nulljava.util.concurrent.TimeoutException: null at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036)
 ~[classes/:?] at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
 ~[classes/:?] at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$14(FutureUtils.java:445)
 ~[classes/:?] at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_251] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_251] at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_251] at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_251] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_251] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_251] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251]
{code}
And we found there was always the function called 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.SavepointStatusHandler.closeHandlerAsync()
 run timeout, and its timeout setting is 5mins.

There was a question that the handler 's close may be not important, cause the 
handler serves other handler called 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.StopWithSavepointHandler
 which was already closed.So should we skip this close ?

PS : There was no problem when we test the code that skip the handler 's close.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24052) Flink SQL reads S3 bucket data.

2021-08-30 Thread Moses (Jira)
Moses created FLINK-24052:
-

 Summary: Flink SQL reads S3 bucket data.
 Key: FLINK-24052
 URL: https://issues.apache.org/jira/browse/FLINK-24052
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Ecosystem
Reporter: Moses


I wanna use Flink SQL reads S3 bucket data. But now I found it ONLY supports 
absolute path, which means I can not read all content in the bucket. 
My SQL statements write as below:
{code:sql}
CREATE TABLE file_data (
a BIGINT, b STRING, c STRING, d DOUBLE, e BOOLEAN, f DATE, g STRING,h 
STRING,
i STRING, j STRING, k STRING, l STRING, m STRING, n STRING, o STRING, p 
FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 's3a://my-bucket',
'format' = 'parquet'
);

SELECT COUNT(*) FROM file_data;
{code}
The exception info:


{code:java}
Caused by: java.lang.IllegalArgumentException: path must be absolute
at 
com.google.common.base.Preconditions.checkArgument(Preconditions.java:88) 
~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1]
at 
org.apache.hadoop.fs.s3a.s3guard.PathMetadata.(PathMetadata.java:68) 
~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1]
at 
org.apache.hadoop.fs.s3a.s3guard.PathMetadata.(PathMetadata.java:60) 
~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1]
at 
org.apache.hadoop.fs.s3a.s3guard.PathMetadata.(PathMetadata.java:56) 
~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1]
at 
org.apache.hadoop.fs.s3a.s3guard.S3Guard.putAndReturn(S3Guard.java:149) 
~[flink-s3-fs-hadoop-1.13.1.jar:1.13.1]
{code}

Is there any solution to meet my requirement ?






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24051) Make consumer.group-id optional for KafkaSource

2021-08-30 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24051:
---

 Summary: Make consumer.group-id optional for KafkaSource
 Key: FLINK-24051
 URL: https://issues.apache.org/jira/browse/FLINK-24051
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


For most of the users it is not necessary to generate a group-id and the source 
itself can provide a meaningful group-id during startup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24050) Support primary keys on metadata columns

2021-08-30 Thread Jira
Ingo Bürk created FLINK-24050:
-

 Summary: Support primary keys on metadata columns
 Key: FLINK-24050
 URL: https://issues.apache.org/jira/browse/FLINK-24050
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Ingo Bürk


Currently, primary keys are required to consist solely of physical columns. 
However, there might be scenarios where the actual payload/records do not 
contain a suitable primary key, but a unique identifier is available through 
metadata. In this case it would make sense to define the primary key on such a 
metadata column:
{code:java}
CREATE TABLE T (
  uid METADATA,
  content STRING
  PRIMARY KEY (uid) NOT ENFORCED
) WITH (…)
{code}
A simple example for this would be IMAP: there is nothing unique about any 
single email as a record, but each email in a specific folder on an IMAP server 
has a unique UID (I'm excluding some irrelevant technical details here).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Do we still maintain travis build system?

2021-08-30 Thread Till Rohrmann
I think Flink 1.10.x used Travis. So I agree with Tison's proposal. +1 for
removing the "@flinkbot run travis" from the command documentation.

cc @Chesnay Schepler 

Cheers,
Till

On Sun, Aug 29, 2021 at 4:48 AM tison  wrote:

> Hi,
>
> I can still see "@flinkbot run travis" in flinkbot's toast but it seems we
> already migrate to azure
> pipeline and this command becomes invalid? If so, let's omit it from the
> toast.
>
> Best,
> tison.
>


Re: Support decimal types with larger precisions

2021-08-30 Thread Timo Walther

Hi Xingcan,

in theory there should be no hard blocker for supporting this. The 
implementation should be flexible enough at most locations. We just 
adopted 38 from the Blink code base which adopted it from Hive.


However, this could be a breaking change for existing pipelines and we 
would need to offer a flag to bring back the old behavior. It would 
definitely lead to a lot of testing work to not cause inconsistencies.


Do you think this is a hard blocker for users?

Regards,
Timo


On 28.08.21 00:21, Xingcan Cui wrote:

Hi all,

Recently, I was trying to load some CDC data from Oracle/Postgres databases
and found that the current precision range [1, 38] for DecimalType may not
meet the requirement for some source types. For instance, in Oracle, if a
column is declared as `NUMBER` without precision and scale, the values in
it could potentially be very large. As DecimalType is backed by Java
BigDecimal, I wonder if we should extend the precision range.

Best,
Xingcan