[jira] [Created] (FLINK-22844) Add doc to introduce ExplainDetails for EXPLAIN sytnax

2021-06-01 Thread WeiNan Zhao (Jira)
WeiNan Zhao created FLINK-22844:
---

 Summary: Add doc to introduce ExplainDetails for EXPLAIN sytnax 
 Key: FLINK-22844
 URL: https://issues.apache.org/jira/browse/FLINK-22844
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.14.0
Reporter: WeiNan Zhao
 Fix For: 1.14.0


Link to FLINK_20562,add doc to introduct this new sytax.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22843) Document and code are inconsistent.

2021-06-01 Thread xiaozilong (Jira)
xiaozilong created FLINK-22843:
--

 Summary: Document and code are inconsistent.
 Key: FLINK-22843
 URL: https://issues.apache.org/jira/browse/FLINK-22843
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.13.0
Reporter: xiaozilong


code: 
[https://github.com/apache/flink/blob/abd321c04d87799800d0b0dada9334fd46f99960/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java#L103]

docment: 
[https://github.com/apache/flink/blob/master/docs/content/docs/connectors/table/hbase.md#lookupcachemax-rows]

The type of configuration `lookup.cache.max-rows` should be Long instead of 
Integer and the defaule value maybe the defaule value is wrong also.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22842) Streaming File Sink end-to-end test fail due to not finish after 900s

2021-06-01 Thread Xintong Song (Jira)
Xintong Song created FLINK-22842:


 Summary: Streaming File Sink end-to-end test fail due to not 
finish after 900s
 Key: FLINK-22842
 URL: https://issues.apache.org/jira/browse/FLINK-22842
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.14.0
Reporter: Xintong Song


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18525=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529=13152



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22841) how to update configuration of the native kubernetes deployment model flink cluster?

2021-06-01 Thread lukehua (Jira)
lukehua created FLINK-22841:
---

 Summary: how to update configuration of the native kubernetes 
deployment model flink cluster?
 Key: FLINK-22841
 URL: https://issues.apache.org/jira/browse/FLINK-22841
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.13.0
 Environment: flink 1.30

k8s 1.16.9

docker image flink:1.13.0
Reporter: lukehua


when i deploy flink with the native session model ,how can i update the 
configuration of the flink cluster?  i can use the script kubernetes-session.sh 
 to change the flink-conf.yaml, but the deployment of jobmanager does not 
restart the pod automatically,and the taskmanager pod does not restart one by 
one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22839) Assign-evenly kafkaTopicPartitions of multiple topics to flinkKafkaConsumer subtask

2021-06-01 Thread Xu xiaolong (Jira)
Xu xiaolong created FLINK-22839:
---

 Summary: Assign-evenly kafkaTopicPartitions of multiple topics to 
flinkKafkaConsumer subtask 
 Key: FLINK-22839
 URL: https://issues.apache.org/jira/browse/FLINK-22839
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.11.0
Reporter: Xu xiaolong


Now ,with flink1.11 kafka connecotr,when we consume multiple kafka topics by 
one flinkkafkaconsumer , when we set the consumer parallelism equals with the 
total partitions count of multiple topic , make a decision each topic partition 
consume by one kafka consumer, so each topic partition count is less than the 
subtask count. But,the problem is that currently, some subtask is total free 
while someothers workload is very high, this problem is caused by that the 
partitionAssigner assign partion of earch topic indepently currently.

Following is one example: Target topics: topi1, topic2 ,topic3 ,topic4.  each 
has 3 partitions. In our job we consume the 4 topic by one consumer , our flink 
standalone cluster got 9 taskworkers on different nodes. we want balance the 
workload as much as possible, so we  set the paralelism of flinkkafkaconsumer 
to 12. from the UI we notice that the 0-5 subtask is free without partition 
assigned, the total 12 partiton is assigned to 6-11 subtask. We learned the 
source code of KafkaTopicPartitionAssigner to explain this phenomenon ,and then 
we extend one more partition assign strategy which can deal with the need we 
describe up, this stategy can evenly assign partiton from multiple topic 
grobally to subtask of consumer. we want to contibute to flink, so someone has 
the same requirement can use it directlly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22840) Assign-evenly kafkaTopicPartitions of multiple topics to flinkKafkaConsumer subtask

2021-06-01 Thread Xu xiaolong (Jira)
Xu xiaolong created FLINK-22840:
---

 Summary: Assign-evenly kafkaTopicPartitions of multiple topics to 
flinkKafkaConsumer subtask 
 Key: FLINK-22840
 URL: https://issues.apache.org/jira/browse/FLINK-22840
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.11.0
Reporter: Xu xiaolong


Now ,with flink1.11 kafka connecotr,when we consume multiple kafka topics by 
one flinkkafkaconsumer , when we set the consumer parallelism equals with the 
total partitions count of multiple topic , make a decision each topic partition 
consume by one kafka consumer, so each topic partition count is less than the 
subtask count. But,the problem is that currently, some subtask is total free 
while someothers workload is very high, this problem is caused by that the 
partitionAssigner assign partion of earch topic indepently currently.

Following is one example: Target topics: topi1, topic2 ,topic3 ,topic4.  each 
has 3 partitions. In our job we consume the 4 topic by one consumer , our flink 
standalone cluster got 9 taskworkers on different nodes. we want balance the 
workload as much as possible, so we  set the paralelism of flinkkafkaconsumer 
to 12. from the UI we notice that the 0-5 subtask is free without partition 
assigned, the total 12 partiton is assigned to 6-11 subtask. We learned the 
source code of KafkaTopicPartitionAssigner to explain this phenomenon ,and then 
we extend one more partition assign strategy which can deal with the need we 
describe up, this stategy can evenly assign partiton from multiple topic 
grobally to subtask of consumer. we want to contibute to flink, so someone has 
the same requirement can use it directlly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS][Statebackend][Runtime] Changelog Statebackend Configuration Proposal

2021-06-01 Thread Yun Tang
Hi Yuan, thanks for launching this discussion.

I prefer option-3 as this is the easiest to understand for users.


Best
Yun Tang

From: Roman Khachatryan 
Sent: Monday, May 31, 2021 16:53
To: dev 
Subject: Re: [DISCUSS][Statebackend][Runtime] Changelog Statebackend 
Configuration Proposal

Hey Yuan, thanks for the proposal

I think Option 3 is the simplest to use and exposes less details than any other.
It's also consistent with the current way of configuring state
backends, as long as we treat change logging as a common feature
applicable to any state backend, like e.g.
state.backend.local-recovery.

Option 6 seems slightly less preferable as it exposes more details but
I think is the most viable alternative.

Regards,
Roman


On Mon, May 31, 2021 at 8:39 AM Yuan Mei  wrote:
>
> Hey all,
>
> We would like to start a discussion on how to enable/config Changelog
> Statebakcend.
>
> As part of FLIP-158[1], Changelog state backend wraps on top of existing
> state backend (HashMapStateBackend, EmbeddedRocksDBStateBackend and may
> expect more) and delegates state changes to the underlying state backends.
> This thread is to discuss the problem of how Changelog StateBackend should
> be enabled and configured.
>
> Proposed options to enable/config state changelog is listed below:
>
> Option 1: Enable Changelog Statebackend through a Boolean Flag
>
> Option 2: Enable Changelog Statebackend through a Boolean Flag + a Special
> Case
>
> Option 3: Enable Changelog Statebackend through a Boolean Flag + W/O
> ChangelogStateBackend Exposed
>
> Option 4: Explicit Nested Configuration + “changelog.inner” prefix for
> inner backend
>
> Option 5: Explicit Nested Configuration + inner state backend configuration
> unchanged
>
> Option 6: Config Changelog and Inner Statebackend All-Together
>
> Details of each option can be found here:
> https://docs.google.com/document/d/13AaCf5fczYTDHZ4G1mgYL685FqbnoEhgo0cdwuJlZmw/edit?usp=sharing
>
> When considering these options, please consider these four dimensions:
> 1 Consistency
> API/config should follow a consistent model and should not have
> contradicted logic beneath
> 2 Simplicity
> API should be easy to use and not introduce too much burden on users
> 3. Explicity
> API/config should not contain implicit assumptions and should be intuitive
> to users
> 4. Extensibility
> With foreseen future, whether the current setting can be easily extended
>
> Please let us know what do you think and please keep the discussion in this
> mailing thread.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints
>
> Best
> Yuan


[jira] [Created] (FLINK-22838) Flink Dashboard display incorrect Version in 1.13,actual display 1.12.2

2021-06-01 Thread Gao Fei (Jira)
Gao Fei created FLINK-22838:
---

 Summary: Flink Dashboard display incorrect Version in 1.13,actual 
display 1.12.2
 Key: FLINK-22838
 URL: https://issues.apache.org/jira/browse/FLINK-22838
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.13.1, 1.13.0
Reporter: Gao Fei
 Fix For: 1.13.2


Flink Dashboard display incorrect Version in 1.13.1,actual display 1.12.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22837) Assign-evenly kafkaTopicPartitions of multiple topics to flinkKafkaConsumer subtask

2021-06-01 Thread Xu xiaolong (Jira)
Xu xiaolong created FLINK-22837:
---

 Summary: Assign-evenly kafkaTopicPartitions of multiple topics to 
flinkKafkaConsumer subtask 
 Key: FLINK-22837
 URL: https://issues.apache.org/jira/browse/FLINK-22837
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.11.0
Reporter: Xu xiaolong


 Now ,with flink1.11 kafka connecotr,when we consume multiple kafka topics by 
one flinkkafkaconsumer , when we set the consumer parallelism equals with the 
total partitions count of multiple topic , make a decision each topic partition 
consume by one kafka consumer, so each topic partition count is less than the 
subtask count. But,the problem is that currently, some subtask is total free 
while someothers workload is very high, this problem is caused by that the 
partitionAssigner assign partion of earch topic indepently currently.

Following is one example:

Target topics: topi1, topic2 ,topic3 ,topic4.  each has 3 partitions.

In our job we consume the 4 topic by one consumer , our flink standalone 
cluster got 9 taskworkers on different nodes. we want balance the workload as 
much as possible, so we 

set the paralelism of flinkkafkaconsumer to 12. from the UI we notice that the 
0-5 subtask is free without partition assigned, the total 12 partiton is 
assigned to 6-11 subtask. We learned the source code of 
KafkaTopicPartitionAssigner and extend one more partition assign strategy that 
can deal with the need we describe up, this stategy can evenly assign partiton 
from multiple topic grobally to subtask of consumer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22836) OffsetsInitializerTest.testTimestampOffsetsInitializer fails on azure

2021-06-01 Thread Xintong Song (Jira)
Xintong Song created FLINK-22836:


 Summary: OffsetsInitializerTest.testTimestampOffsetsInitializer 
fails on azure
 Key: FLINK-22836
 URL: https://issues.apache.org/jira/browse/FLINK-22836
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.13.1
Reporter: Xintong Song


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18510=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6503


{code:java}
Jun 01 13:51:03 java.lang.AssertionError: expected:<9> but was:<10>
Jun 01 13:51:03 at org.junit.Assert.fail(Assert.java:88)
Jun 01 13:51:03 at org.junit.Assert.failNotEquals(Assert.java:834)
Jun 01 13:51:03 at org.junit.Assert.assertEquals(Assert.java:645)
Jun 01 13:51:03 at org.junit.Assert.assertEquals(Assert.java:631)
Jun 01 13:51:03 at 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerTest.lambda$testTimestampOffsetsInitializer$1(OffsetsInitializerTest.java:110)
Jun 01 13:51:03 at java.util.HashMap.forEach(HashMap.java:1289)
Jun 01 13:51:03 at 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerTest.testTimestampOffsetsInitializer(OffsetsInitializerTest.java:107)
Jun 01 13:51:03 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Jun 01 13:51:03 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Jun 01 13:51:03 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jun 01 13:51:03 at java.lang.reflect.Method.invoke(Method.java:498)
Jun 01 13:51:03 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
Jun 01 13:51:03 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Jun 01 13:51:03 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
Jun 01 13:51:03 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Jun 01 13:51:03 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
Jun 01 13:51:03 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
Jun 01 13:51:03 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
Jun 01 13:51:03 at 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
Jun 01 13:51:03 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
Jun 01 13:51:03 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
Jun 01 13:51:03 at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
Jun 01 13:51:03 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
Jun 01 13:51:03 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Jun 01 13:51:03 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
Jun 01 13:51:03 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
Jun 01 13:51:03 at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
Jun 01 13:51:03 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
Jun 01 13:51:03 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
Jun 01 13:51:03 at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
Jun 01 13:51:03 at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
Jun 01 13:51:03 at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
Jun 01 13:51:03 at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
Jun 01 13:51:03 at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-06-01 Thread Steven Wu
discussed the PR with Thosmas offline. Thomas, please correct me if I
missed anything.

Right now, the PR differs from the FLIP-150 doc regarding the converter.
* Current PR uses the enumerator checkpoint state type as the input for the
converter
* FLIP-150 defines a new EndStateT interface.
It seems that the FLIP-150 approach of EndStateT is more flexible, as
transition EndStateT doesn't have to be included in the upstream source
checkpoint state.

Let's look at two use cases:
1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm,
then Kafka source starts with initial position of 5 pm. In this case, there
is no need for converter or EndStateT since the starting time for Kafka
source is known and fixed.
2) dynamic cutover time at 1 hour before now. This is useful when the
bootstrap of historic data takes a long time (like days or weeks) and we
don't know the exact time of cutover when a job is launched. Instead, we
are instructing the file source to stop when it gets close to live data. In
this case, hybrid source construction will specify a relative time (now - 1
hour), the EndStateT (of file source) will be resolved to an absolute time
for cutover. We probably don't need to include EndStateT (end timestamp) as
the file source checkpoint state. Hence, the separate EndStateT is probably
more desirable.

We also discussed the converter for the Kafka source. Kafka source supports
different OffsetsInitializer impls (including TimestampOffsetsInitializer).
To support the dynamic cutover time (use case #2 above), we can plug in a
SupplierTimestampOffsetInitializer, where the starting timestamp is not set
during source/job construction. Rather it is a supplier model where the
starting timestamp value is set to the resolved absolute timestamp during
switch.

Thanks,
Steven



On Thu, May 20, 2021 at 8:59 PM Thomas Weise  wrote:

> Hi Nicholas,
>
> Thanks for taking a look at the PR!
>
> 1. Regarding switching mechanism:
>
> There has been previous discussion in this thread regarding the pros
> and cons of how the switching can be exposed to the user.
>
> With fixed start positions, no special switching interface to transfer
> information between enumerators is required. Sources are configured as
> they would be when used standalone and just plugged into HybridSource.
> I expect that to be a common use case. You can find an example for
> this in the ITCase:
>
>
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
>
> For dynamic start position, the checkpoint state is used to transfer
> information from old to new enumerator. An example for that can be
> found here:
>
>
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
>
> That may look verbose, but the code to convert from one state to
> another can be factored out into a utility and the function becomes a
> one-liner.
>
> For common sources like files and Kafka we can potentially (later)
> implement the conversion logic as part of the respective connector's
> checkpoint and split classes.
>
> I hope that with the PR up for review, we can soon reach a conclusion
> on how we want to expose this to the user.
>
> Following is an example for Files -> Files -> Kafka that I'm using for
> e2e testing. It exercises both ways of setting the start position.
>
> https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
>
>
> 2. Regarding the events used to implement the actual switch between
> enumerator and readers: I updated the PR with javadoc to clarify the
> intent. Please let me know if that helps or let's continue to discuss
> those details on the PR?
>
>
> Thanks,
> Thomas
>
>
> On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang 
> wrote:
> >
> > Hi Thomas,
> >
> >Sorry for later reply for your POC. I have reviewed the based abstract
> > implementation of your pull request:
> > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > mechanism, this level of abstraction is not concise enough, which doesn't
> > make connector contribution easier. In theory, it is necessary to
> introduce
> > a set of interfaces to support the switching mechanism. The
> SwitchableSource
> > and SwitchableSplitEnumerator interfaces are needed for connector
> > expansibility.
> >In other words, the whole switching process of above mentioned PR is
> > different from that mentioned in FLIP-150. In the above implementation,
> the
> > source reading switching is executed after receving the
> SwitchSourceEvent,
> > which could be before the sending SourceReaderFinishEvent. This timeline
> of
> > source reading switching could be discussed here.
> >@Stephan @Becket, if you are available, please help to review the
> > abstract implementation, and compare with the interfaces mentioned in
> > FLIP-150.
> >
> > Thanks,
> > Nicholas Jiang
> >
> >
> >
> > --
> > Sent from:
> 

[VOTE] Watermark propagation with Sink API

2021-06-01 Thread Eron Wright
After some good discussion about how to enhance the Sink API to process
watermarks, I believe we're ready to proceed with a vote.  Voting will be
open until at least Friday, June 4th, 2021.

Reference:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API

Discussion thread:
https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E

Implementation Issue:
https://issues.apache.org/jira/browse/FLINK-22700

Thanks,
Eron Wright
StreamNative


[jira] [Created] (FLINK-22835) ChangelogStateBackend tests use nested backend on recovery

2021-06-01 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-22835:
-

 Summary: ChangelogStateBackend tests use nested backend on recovery
 Key: FLINK-22835
 URL: https://issues.apache.org/jira/browse/FLINK-22835
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends, Tests
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.14.0


For example, ChangelogDelegateMemoryStateBackendTest overrides 
createKeyedBackend() but does NOT getStateBackend(). The latter is being used 
on recovery.

cc: @ym 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: The partition tracker should support remote shuffle properly

2021-06-01 Thread XING JIN
Hi, Till ~ Thanks for your comments. Yes, I think we are on the same page –
we are discussing how should JMPartitionTracker manage partitions properly ~

> If the ShuffleMaster should lose a result partition, then a reading task
should fail with a PartitionException which will invalidate the partition
on the JobMasterPartitionTracker so that it is reproduced.

1. True, the reproduction of a lost upstream partition could be triggered
by the shuffle-read failure of downstream. But it tends to be unacceptable
in the production environment for batch processing jobs. Say there are 100
upstream partitions lost due to failure of an external shuffle worker, if
there's no notification from ShuffleMaster to JMPartitionTracker, 100
PartitionExceptions on downstream will happen and upstream partitions will
be reproduced one by one sequentially. The time overhead will be
unacceptable. From this point of view, I tend to think ShuffleMaster should
have the ability to unregister partitions by locationID on
JMPartitionTracker.

> I think that deciding whether the partition is stored externally or not
can be answered by using the ShuffleDescriptor.storesLocalResourcesOn
method.

2. Yes, I agree it works (though it's not easy to access ShuffleDescriptor
by tmID from JMPartitionTracker at this moment). But if we agree that my
first point is valid, JMPartitionTracker should maintain the index from
locationID to partition. Then it will be straightforward to check whether a
partition is accommodated on remote by comparing its tmID and locationID;

Best,
Jin

Till Rohrmann  于2021年6月1日周二 下午4:59写道:

> Hi Jin,
>
> thanks for starting this discussion and the initiative to implement a
> remote shuffle service. It has always been the idea of the ShuffleService
> abstraction to make this possible and we probably have overlooked some
> details.
>
> What I have understood from your description, you would like to introduce
> the locationID which points to the location where the result partition is
> stored (potentially external). Using the locationID and the tmID it is
> possible to say whether the partition is stored externally or not.
>
> I think that deciding whether the partition is stored externally or not (or
> more precisely whether the partition occupies resources on the TM) can be
> answered by using the ShuffleDescriptor.storesLocalResourcesOn method. If
> it returns some ResourceID then we have to tell the TM about the release.
> If not, then we only tell the shuffle master about the partition release.
> How the data can be accessed on the external system is then encapsulated by
> the ShuffleMaster and the ShuffleDescriptor. The logic for releasing the
> partitions on the TMs and the ShuffleMaster should already be implemented
> in the JobMasterPartitionTrackerImpl.
>
> I think what we need to change is that we don't stop the tracking of
> completed partitions when a TM on which the producers run disconnects and
> if we store the result partition externally. This is required to make
> partitions survive in case of TM failures. What this also requires is to
> distinguish between finished and in-progress partitions.
>
> What indeed is currently not implemented is the channel from the
> ShuffleMaster to the JobMasterPartitionTrackerImpl. This is, however, not a
> big problem atm. If the ShuffleMaster should lose a result partition, then
> a reading task should fail with a PartitionException which will invalidate
> the partition on the JMPartitionTracker so that it is reproduced. Listening
> to the ShuffleMaster would be an optimization to learn more quickly about
> this fact and to avoid a restart cycle.
>
> Did I understand you correctly, Jin, and do my comments make sense?
>
> Cheers,
> Till
>
> On Wed, May 26, 2021 at 5:52 AM XING JIN  wrote:
>
> > Hi devs ~
> > Recently our team designed and started to build Flink remote shuffle
> > service based on 'pluggable shuffle service framework'[1] for batch
> > processing jobs. We found some potential enhancements could be made on
> > 'pluggable shuffle service' and created an umbrella JIRA[2]. I raise this
> > DISCUSSION and want to hear broader feedback / comments on one ticket [3]
> > -- "The partition tracker should support remote shuffle properly".
> >
> > In current Flink, data partition is bound with the ResourceID of TM in
> > Execution#startTrackingPartitions and JM partition tracker will stop
> > tracking corresponding partitions when a TM
> > disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of
> shuffle
> > data is bound with computing resource (TM). It works fine for internal
> > shuffle service, but doesn't for remote shuffle service. Note that
> shuffle
> > data is accommodated on remote, the lifecycle of a completed partition is
> > capable to be decoupled with TM, i.e. TM is totally fine to be released
> > when no computing task is on it and further shuffle reading requests
> could
> > be directed to remote shuffle cluster. In addition, when a TM is 

[jira] [Created] (FLINK-22834) KafkaITCase.testBigRecordJob times out on azure

2021-06-01 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22834:


 Summary:  KafkaITCase.testBigRecordJob times out on azure
 Key: FLINK-22834
 URL: https://issues.apache.org/jira/browse/FLINK-22834
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.0
Reporter: Dawid Wysakowicz
 Fix For: 1.14.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18481=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6992

{code}
java.lang.InterruptedException
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:49)
at 
org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBigRecordTestTopology(KafkaConsumerTestBase.java:1471)
at 
org.apache.flink.streaming.connectors.kafka.KafkaITCase.testBigRecordJob(KafkaITCase.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Jun 01 07:06:39 [ERROR] Tests run: 23, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 214.086 s <<< FAILURE! - in 
org.apache.flink.streaming.connectors.kafka.KafkaITCase
Jun 01 07:06:39 [ERROR] 
testBigRecordJob(org.apache.flink.streaming.connectors.kafka.KafkaITCase)  Time 
elapsed: 60.015 s  <<< ERROR!
Jun 01 07:06:39 org.junit.runners.model.TestTimedOutException: test timed out 
after 6 milliseconds
Jun 01 07:06:39 at sun.misc.Unsafe.park(Native Method)
Jun 01 07:06:39 at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
Jun 01 07:06:39 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
Jun 01 07:06:39 at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
Jun 01 07:06:39 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
Jun 01 07:06:39 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
Jun 01 07:06:39 at 
org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:49)
Jun 01 07:06:39 at 
org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBigRecordTestTopology(KafkaConsumerTestBase.java:1471)
Jun 01 07:06:39 at 
org.apache.flink.streaming.connectors.kafka.KafkaITCase.testBigRecordJob(KafkaITCase.java:119)
Jun 01 07:06:39 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Jun 01 07:06:39 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Jun 01 07:06:39 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jun 01 07:06:39 at java.lang.reflect.Method.invoke(Method.java:498)
Jun 01 07:06:39 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Jun 01 07:06:39 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Jun 01 07:06:39 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Jun 01 07:06:39 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Jun 01 07:06:39 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
Jun 01 07:06:39 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
Jun 01 07:06:39 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
Jun 01 07:06:39 at java.lang.Thread.run(Thread.java:748)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22833) Source tasks (both old and new) are not reporting checkpointStartDelay via CheckpointMetrics

2021-06-01 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-22833:
--

 Summary: Source tasks (both old and new) are not reporting 
checkpointStartDelay via CheckpointMetrics
 Key: FLINK-22833
 URL: https://issues.apache.org/jira/browse/FLINK-22833
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.12.4, 1.13.1
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: recover from svaepoint

2021-06-01 Thread Till Rohrmann
The error message says that we are trying to reuse a transaction id that is
currently being used or has expired.

I am not 100% sure how this can happen. My suspicion is that you have
resumed a job multiple times from the same savepoint. Have you checked that
there is no other job which has been resumed from the same savepoint and
which is currently running or has run and completed checkpoints?

@pnowojski  @Becket Qin  how
does the transaction id generation ensures that we don't have a clash of
transaction ids if we resume the same job multiple times from the same
savepoint? From the code, I do see that we have a TransactionalIdsGenerator
which is initialized with the taskName and the operator UID.

fyi: @Arvid Heise 

Cheers,
Till


On Mon, May 31, 2021 at 11:10 AM 周瑞  wrote:

> HI:
>   When "sink.semantic = exactly-once", the following exception is
> thrown when recovering from svaepoint
>
>public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' =
> '90',\n" +
> "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
> at java.lang.Thread.run(Thread.java:748)
>


Re: Getting Abstract method Error with flink 1.13.0 and 1.12.0

2021-06-01 Thread Dipanjan Mazumder
 Thanks Till will do so ...
On Tuesday, June 1, 2021, 06:22:32 PM GMT+5:30, Till Rohrmann 
 wrote:  
 
 Hi Dipanjan,
this type of question is best sent to Flink's user mailing list because there 
are a lot more people using Flink who could help you. The dev mailing list is 
intended to be used for development discussions.
Cheers,Till
On Tue, Jun 1, 2021 at 1:31 PM Dipanjan Mazumder  wrote:

Hi ,
   I have integrated flink-siddhi library ([com.github.haoch/flink-siddhi_2.11 
"0.2.2-SNAPSHOT"]) and i tried to configure and implement control stream from 
flink-siddh and it broke with AbstractMethodError. When i tried running the 
same with flink 1.11.0 it worked.
More Details is given in this stack overflow link : Flink-Siddhi control event 
failing to start
Any help on this will be very great and will help me go forward:

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Flink-Siddhi control event failing to start

While trying to configure and implement flink- 
siddhi(https://clojars.org/com.github.haoch/flink-siddhi_2.11/ver...
 |

 |

 |





| 
| 
| 
|  |  |

 |

 |
| 
|  | 
[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]

[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"] null
 |

 |

 |


RegardsDipanjan

  

[jira] [Created] (FLINK-22832) Drop usages of legacy planner in SQL Client

2021-06-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-22832:


 Summary: Drop usages of legacy planner in SQL Client
 Key: FLINK-22832
 URL: https://issues.apache.org/jira/browse/FLINK-22832
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client, Table SQL / Legacy Planner
Reporter: Timo Walther
Assignee: Timo Walther


Drop legacy planner support for SQL Client



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Getting Abstract method Error with flink 1.13.0 and 1.12.0

2021-06-01 Thread Till Rohrmann
Hi Dipanjan,

this type of question is best sent to Flink's user mailing list because
there are a lot more people using Flink who could help you. The dev mailing
list is intended to be used for development discussions.

Cheers,
Till

On Tue, Jun 1, 2021 at 1:31 PM Dipanjan Mazumder  wrote:

> Hi ,
>
>I have integrated flink-siddhi library ([com.github.haoch/flink-siddhi_2.11
> "0.2.2-SNAPSHOT"]
> )
> and i tried to configure and implement control stream from flink-siddh and
> it broke with AbstractMethodError. When i tried running the same with flink
> 1.11.0 it worked.
>
> More Details is given in this stack overflow link : Flink-Siddhi control
> event failing to start
> 
>
> Any help on this will be very great and will help me go forward:
>
> Flink-Siddhi control event failing to start
>
> While trying to configure and implement flink-
> siddhi(https://clojars.org/com.github.haoch/flink-siddhi_2.11/ver...
>
> 
>
>
>
> [com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]
>
> [com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"] null
>
> 
>
> Regards
> Dipanjan
>
>


[jira] [Created] (FLINK-22831) Drop usages of legacy planner in Scala shell

2021-06-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-22831:


 Summary: Drop usages of legacy planner in Scala shell
 Key: FLINK-22831
 URL: https://issues.apache.org/jira/browse/FLINK-22831
 Project: Flink
  Issue Type: Sub-task
  Components: Scala Shell, Table SQL / Legacy Planner
Reporter: Timo Walther
Assignee: Timo Walther


Remove references to {{flink-table-planner}} in the Scala shell.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22830) DataSet API to allow to collect and do the bulk upload in DB

2021-06-01 Thread Ravi (Jira)
Ravi created FLINK-22830:


 Summary: DataSet API to allow to collect and do the bulk upload in 
DB
 Key: FLINK-22830
 URL: https://issues.apache.org/jira/browse/FLINK-22830
 Project: Flink
  Issue Type: Improvement
Reporter: Ravi


Experts,

 

I am trying to perform some ETL operation on large data set as batch 
processing. My requirement is to extract the data , transform it and then save 
to mongoDB. I am using Apache FLINK but the performance is very slow as I am 
doing the mongoDB update on each row.

Is there any way where we can sink as bulk record so that performance can 
increase. Like after all the transformation we do the bulk update on mongoDB. 
We can aggregate them all and finally sink it in DB just like stream 
[.aggregate() .sink(\{bulk update})]  . Please refer 
[https://stackoverflow.com/questions/67717964/java-apache-flink-batch-processing-performance-for-bulk-mongodb-update]
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Getting Abstract method Error with flink 1.13.0 and 1.12.0

2021-06-01 Thread Dipanjan Mazumder
Hi ,
   I have integrated flink-siddhi library ([com.github.haoch/flink-siddhi_2.11 
"0.2.2-SNAPSHOT"]) and i tried to configure and implement control stream from 
flink-siddh and it broke with AbstractMethodError. When i tried running the 
same with flink 1.11.0 it worked.
More Details is given in this stack overflow link : Flink-Siddhi control event 
failing to start
Any help on this will be very great and will help me go forward:

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Flink-Siddhi control event failing to start

While trying to configure and implement flink- 
siddhi(https://clojars.org/com.github.haoch/flink-siddhi_2.11/ver...
 |

 |

 |





| 
| 
| 
|  |  |

 |

 |
| 
|  | 
[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]

[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"] null
 |

 |

 |


RegardsDipanjan


Re: How to use or configure flink checkpointing with siddhi internal state

2021-06-01 Thread Dipanjan Mazumder
 Hi Till,
    Thanks so that means it should work will try and see ..
RegardsDipanjan
On Tuesday, June 1, 2021, 01:48:19 PM GMT+5:30, Till Rohrmann 
 wrote:  
 
 Hi Dipanjan,
I am assuming that you are using the flink-siddhi library [1]. I am not an 
expert but it looks as if the AbstractSiddhiOperator overrides the 
snapshotState [2] method to store the Siddhi state in Flink.
[1] https://github.com/haoch/flink-siddhi[2] 
https://github.com/haoch/flink-siddhi/blob/master/core/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java#L331
Cheers,Till
On Mon, May 31, 2021 at 7:27 PM Dipanjan Mazumder  
wrote:

Hi ,
   I was trying to do checkpointing while using siddhi as the CEP engine 
running on flink. While using siddhi windowing , it uses an internal state to 
aggregated or perform operation on a bucket of events pertaining to a specific 
time window. But what i am not sure is how can that state be mapped to Flinks 
internal state so that i can use flink checkpointing to safeguard the internal 
state of the siddhi operators in the face of failure.
Any help or pointer for this will be of great help to me.Thanks in advance.
Dipanjan -
  

[jira] [Created] (FLINK-22829) Drop usages of legacy planner in Hbase modules

2021-06-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-22829:


 Summary: Drop usages of legacy planner in Hbase modules
 Key: FLINK-22829
 URL: https://issues.apache.org/jira/browse/FLINK-22829
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / HBase, Table SQL / Legacy Planner
Reporter: Timo Walther
Assignee: Timo Walther


Remove references to {{flink-table-planner}} in the Hbase modules.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] SQL CTAS Syntax

2021-06-01 Thread Jingsong Li
Thanks Danny for starting the discussion.

+1 for this feature.

I like schema evolution.

A question:

Can we support multiple pipelines? For example, I have three source tables
and just one sink table.

So, I will write three CTAS:
- CTAS IF NOT EXISTS 1
- CTAS IF NOT EXISTS 2
- CTAS IF NOT EXISTS 3

Do I have a chance to configure different sink options? Something like
dynamic table options?

Best,
Jingsong

On Fri, May 28, 2021 at 10:10 PM Kurt Young  wrote:

> Hi Konstantin,
>
> From my understanding, this syntax has 2 major benefits:
>
> 1. Just like you said, it saves the effort to specify the schema,
> especially when involving hundreds of fields.
> 2. When using CREATE TABLE xx AS TABLE yy, it gives us the possibility to
> enable schema evolution, and it seems pretty natural to do so.
>
> Best,
> Kurt
>
>
> On Fri, May 28, 2021 at 5:44 PM Konstantin Knauf 
> wrote:
>
> > Hi everyone,
> >
> > quick question for my understanding: how is this different to
> >
> > CREATE TABLE IF NOT EXISTS my_table (
> > ...
> > ) WITH (
> > ...
> > );
> > INSERT INTO my_table SELECT ...;
> >
> > ?
> >
> > Is it only about a) not having to specify the schema and b) a more
> > condensed syntax?
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Fri, May 28, 2021 at 11:30 AM Jark Wu  wrote:
> >
> > > Thanks Danny for starting the discussion of extending CTAS syntax.
> > >
> > > I think this is a very useful feature for data integration and ETL jobs
> > (a
> > > big use case of Flink).
> > > Many users complain a lot that manually defining schemas for sources
> and
> > > sinks is hard.
> > > CTAS helps users to write ETL jobs without defining any schemas of
> > sources
> > > and sinks.
> > > CTAS automatically creates physical tables in external systems, and
> > > automatically
> > > maps external tables to Flink tables with the help of catalogs (e.g.
> > > PgCatalog, HiveCatalog).
> > >
> > > On the other hand, the schema of the SELECT query is fixed after
> compile
> > > time.
> > > CTAS TABLE extends the syntax which allows dynamic schema during
> runtime,
> > > semantically it streaming copies the up-to-date structure and data (if
> > run
> > > in streaming mode).
> > > So I think CTAS TABLE is a major step forward for data integration, it
> > > defines a syntax
> > > which allows the underlying streaming pipeline automatically migrate
> > schema
> > > evolution
> > > (e.g. ADD COLUMN) from source tables to sink tables without stopping
> jobs
> > > or updating SQLs.
> > >
> > > Therefore, I'm +1 for the feature.
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 28 May 2021 at 16:22, JING ZHANG  wrote:
> > >
> > > > Hi Danny,
> > > >
> > > > Thanks for starting this discussion.
> > > >
> > > >
> > > >
> > > > Big +1 for this feature. Both CTAS AND CREATE TABLE LIKE are very
> > useful
> > > > features. IMO, it is clear to separate them into two parts in the
> > > `syntax`
> > > > character. 
> > > >
> > > >
> > > >
> > > > First, I have two related problems:
> > > >
> > > >
> > > > 1. Would `create table` in CTAS trigger to create a physical table in
> > > > external storage system?
> > > >
> > > > For example, now normal `create table` would only define a connecting
> > > with
> > > > an existed external Kafka topic instead of trigger to create a
> physical
> > > > kafka topic in kafka cluster. Does this behavior still work for CTAS
> > AND
> > > > CREATE TABLE LIKE?
> > > >
> > > >
> > > > 2. Would the data sync  of CTAS run continuously if select works on a
> > > > unbounded source?
> > > >
> > > > Since sub select query may works on unbounded source in Flink, which
> is
> > > > different with other system (postgres, spark, hive, mysql). Does data
> > > sync
> > > > continuously run or just sync the snapshot at the job submit?
> > > >
> > > >
> > > >
> > > > Besides, I have some minor problems which is mentioned in your email.
> > > >
> > > >
> > > >
> > > > > how to write data into existing table with history data declare [IF
> > NOT
> > > > EXISTS] keywords and we ignore the table creation but the pipeline
> > still
> > > > starts up
> > > >
> > > >
> > > >
> > > > Maybe we should check old schema and new schema. What would happen if
> > > > schema of existed table is different with new schema?
> > > >
> > > >
> > > >
> > > > > How to match sub-database and sub-table ? Use regex style source
> > table
> > > > name
> > > >
> > > >
> > > >
> > > >1. What would happen if schema of matched tables different with
> each
> > > > other?
> > > >
> > > >2. What orders to sync data of all matched table? Sync data from
> all
> > > > matched tables one by one or at the same time?
> > > >
> > > >
> > > >
> > > > >  AS select_statement: copy source table data into target
> > > >
> > > >
> > > >
> > > > User could  explicitly specify the data type for each column in the
> > CTAS,
> > > > what happened when run the following example. The demo is from MySQL
> > > > document,
> > > 

[jira] [Created] (FLINK-22828) Allow using a custom AWS credentials provider for the Kinesis Connector

2021-06-01 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-22828:
---

 Summary: Allow using a custom AWS credentials provider for the 
Kinesis Connector
 Key: FLINK-22828
 URL: https://issues.apache.org/jira/browse/FLINK-22828
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Affects Versions: 1.14.0
Reporter: Arvid Heise
Assignee: Arvid Heise
 Fix For: 1.14.0


Users currently have to use the credential providers that are pre-configured in 
Kinesis connector. 

For advanced users, it would be nice to be able to configure it similar to 
Presto: 
https://prestodb.io/docs/0.187/connector/hive.html#custom-s3-credentials-provider



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-01 Thread Chesnay Schepler

There's a related effort: https://issues.apache.org/jira/browse/FLINK-21108

On 6/1/2021 10:14 AM, Till Rohrmann wrote:

Hi Gabor, welcome to the Flink community!

Thanks for sharing this proposal with the community Márton. In general, I
agree that authentication is missing and that this is required for using
Flink within an enterprise. The thing I am wondering is whether this
feature strictly needs to be implemented inside of Flink or whether a proxy
setup could do the job? Have you considered this option? If yes, then it
would be good to list it under the point of rejected alternatives.

I do see the benefit of implementing this feature inside of Flink if many
users need it. If not, then it might be easier for the project to not
increase the surface area since it makes the overall maintenance harder.

Cheers,
Till

On Mon, May 31, 2021 at 4:57 PM Márton Balassi  wrote:


Hi team,

Firstly I would like to introduce Gabor or G [1] for short to the
community, he is a Spark committer who has recently transitioned to the
Flink Engineering team at Cloudera and is looking forward to contributing
to Apache Flink. Previously G primarily focused on Spark Streaming and
security.

Based on requests from our customers G has implemented Kerberos and HTTP
Basic Authentication for the Flink Dashboard and HistoryServer. Previously
lacked an authentication story.

We are looking to contribute this functionality back to the community, we
believe that given Flink's maturity there should be a common code solution
for this general pattern.

We are looking forward to your feedback on G's design. [2]

[1] http://gaborsomogyi.com/
[2]

https://docs.google.com/document/d/1NMPeJ9H0G49TGy3AzTVVJVKmYC0okwOtqLTSPnGqzHw/edit





[jira] [Created] (FLINK-22827) Hive CREATE TABLE Support CLUSTERED BY

2021-06-01 Thread Ma Jun (Jira)
Ma Jun created FLINK-22827:
--

 Summary: Hive CREATE TABLE Support CLUSTERED BY
 Key: FLINK-22827
 URL: https://issues.apache.org/jira/browse/FLINK-22827
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.13.1
Reporter: Ma Jun


{code:java}
# hive syntax:
CREATE [ EXTERNAL ] TABLE [ IF NOT EXISTS ] table_identifier
[ ( col_name1[:] col_type1 [ COMMENT col_comment1 ], ... ) ]
[ COMMENT table_comment ]
[ PARTITIONED BY ( col_name2[:] col_type2 [ COMMENT col_comment2 ], ... ) 
| ( col_name1, col_name2, ... ) ]
[ CLUSTERED BY ( col_name1, col_name2, ...) 
[ SORTED BY ( col_name1 [ ASC | DESC ], col_name2 [ ASC | DESC ], ... ) 
] 
INTO num_buckets BUCKETS ]
[ ROW FORMAT row_format ]
[ STORED AS file_format ]
[ LOCATION path ]
[ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]
[ AS select_statement ]

{code}
 
{code:java}
[ CLUSTERED BY ( col_name1, col_name2, ...) [ SORTED BY ( col_name1 [ ASC | 
DESC ], col_name2 [ ASC | DESC ], ... ) ] 
{code}
Will Flink support the way of creating tables and supporting clustered by | 
sort by into buckets in later versions?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: The partition tracker should support remote shuffle properly

2021-06-01 Thread Till Rohrmann
Hi Jin,

thanks for starting this discussion and the initiative to implement a
remote shuffle service. It has always been the idea of the ShuffleService
abstraction to make this possible and we probably have overlooked some
details.

What I have understood from your description, you would like to introduce
the locationID which points to the location where the result partition is
stored (potentially external). Using the locationID and the tmID it is
possible to say whether the partition is stored externally or not.

I think that deciding whether the partition is stored externally or not (or
more precisely whether the partition occupies resources on the TM) can be
answered by using the ShuffleDescriptor.storesLocalResourcesOn method. If
it returns some ResourceID then we have to tell the TM about the release.
If not, then we only tell the shuffle master about the partition release.
How the data can be accessed on the external system is then encapsulated by
the ShuffleMaster and the ShuffleDescriptor. The logic for releasing the
partitions on the TMs and the ShuffleMaster should already be implemented
in the JobMasterPartitionTrackerImpl.

I think what we need to change is that we don't stop the tracking of
completed partitions when a TM on which the producers run disconnects and
if we store the result partition externally. This is required to make
partitions survive in case of TM failures. What this also requires is to
distinguish between finished and in-progress partitions.

What indeed is currently not implemented is the channel from the
ShuffleMaster to the JobMasterPartitionTrackerImpl. This is, however, not a
big problem atm. If the ShuffleMaster should lose a result partition, then
a reading task should fail with a PartitionException which will invalidate
the partition on the JMPartitionTracker so that it is reproduced. Listening
to the ShuffleMaster would be an optimization to learn more quickly about
this fact and to avoid a restart cycle.

Did I understand you correctly, Jin, and do my comments make sense?

Cheers,
Till

On Wed, May 26, 2021 at 5:52 AM XING JIN  wrote:

> Hi devs ~
> Recently our team designed and started to build Flink remote shuffle
> service based on 'pluggable shuffle service framework'[1] for batch
> processing jobs. We found some potential enhancements could be made on
> 'pluggable shuffle service' and created an umbrella JIRA[2]. I raise this
> DISCUSSION and want to hear broader feedback / comments on one ticket [3]
> -- "The partition tracker should support remote shuffle properly".
>
> In current Flink, data partition is bound with the ResourceID of TM in
> Execution#startTrackingPartitions and JM partition tracker will stop
> tracking corresponding partitions when a TM
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle
> data is bound with computing resource (TM). It works fine for internal
> shuffle service, but doesn't for remote shuffle service. Note that shuffle
> data is accommodated on remote, the lifecycle of a completed partition is
> capable to be decoupled with TM, i.e. TM is totally fine to be released
> when no computing task is on it and further shuffle reading requests could
> be directed to remote shuffle cluster. In addition, when a TM is lost, its
> completed data partitions on remote shuffle cluster could avoid
> reproducing.
>
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed
> up partition's locationID (where the partition is located) and tmID (which
> TM the partition is produced from). In TM internal shuffle, partition's
> locationID is the same with tmID, but it is not in remote shuffle;
> JobMasterPartitionTracker as an independent component should be able to
> differentiate locationID and tmID of a partition, thus to handle the
> lifecycle of a partition properly;
>
> We propose that JobMasterPartitionTracker indexes partitions with both
> locationID and tmID. The process of registration and unregistration will be
> like below:
>
> A. Partition Registration
> - Execution#registerProducedPartitions registers partition to ShuffleMaster
> and gets a ShuffleDescriptor. Current
> ShuffleDescriptor#storesLocalResourcesOn returns the location of the
> producing TM ONLY IF the partition occupies local resources there. We
> propose to change this method a proper name and always return the
> locationID of the partition. It might be as below:
> ResourceID getLocationID();
> - Execution#registerProducePartitions then registers partition to
> JMPartitionTracker with tmID (ResourceID of TaskManager from
> TaskManagerLocation) and the locationID (acquired in above step).
> JobMasterPartitionTracker will indexes a partition with both tmID and
> locationID;
>
> B. Invokes from JM and ShuffleMaster
> JobMasterPartitionTracker listens invokes from both JM and ShuffleMaster.
> - When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a
> TM disconnects, it will check whether the disconnected 

[jira] [Created] (FLINK-22826) link sql1.13.1基于change log流数据join导致数据丢失

2021-06-01 Thread Jira
徐州州 created FLINK-22826:
---

 Summary: link sql1.13.1基于change log流数据join导致数据丢失
 Key: FLINK-22826
 URL: https://issues.apache.org/jira/browse/FLINK-22826
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.1, 1.12.0
Reporter: 徐州州


{code:java}
insert into dwd_order_detail
select
   ord.Id,
   ord.Code,
   Status
 concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  as 
STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
 TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
from
orders ord
left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
TIMESTAMP)
or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
) and ord.IsDeleted=0;
{code}
My upsert-kafka table for PRIMARY KEY for uuids.

This is the logic of my kafka based canal-json stream data join and write to 
Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
upgraded from 1.12 to 1.13.

I look up a user s order data and order number XJ0120210531004794 in canal-json 
original table as U which is normal.
{code:java}
| +U | XJ0120210531004794 |  50 |
| +U | XJ0120210531004672 |  50 |
{code}
But written to upsert-kakfa via join, the data consumed from upsert kafka is,
{code:java}
| +I | XJ0120210531004794 |  50 |
| -U | XJ0120210531004794 |  50 |
{code}
The order is two records this sheet in orders and order_extend tables has not 
changed since created -U status caused my data loss not computed and the final 
result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22825) link sql1.13.1基于change log流数据join导致数据丢失

2021-06-01 Thread Jira
徐州州 created FLINK-22825:
---

 Summary: link sql1.13.1基于change log流数据join导致数据丢失
 Key: FLINK-22825
 URL: https://issues.apache.org/jira/browse/FLINK-22825
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.1, 1.12.0
Reporter: 徐州州


|insert into dwd_order_detail
|select
| ord.Id,
| ord.Code,
| Status
| concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id as 
STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids,
| TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
|from
|orders ord
|left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and 
oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
|where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
TIMESTAMP)
|or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
|or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
|) and ord.IsDeleted=0;
My upsert-kafka table for PRIMARY KEY for uuids.
This is the logic of my kafka based canal-json stream data join and write to 
Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
upgraded from 1.12 to 1.13.
I look up a user s order data and order number XJ0120210531004794 in canal-json 
original table as U which is normal.
| +U | XJ0120210531004794 | 50 |
| +U | XJ0120210531004672 | 50 |

But written to upsert-kakfa via join, the data consumed from upsert kafka is,
| +I | XJ0120210531004794 | 50 |
| -U | XJ0120210531004794 | 50 |
The order is two records this sheet in orders and order_extend tables has not 
changed since created -U status caused my data loss not computed and the final 
result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22824) Drop usages of legacy planner in Kafka modules

2021-06-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-22824:


 Summary: Drop usages of legacy planner in Kafka modules
 Key: FLINK-22824
 URL: https://issues.apache.org/jira/browse/FLINK-22824
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka, Table SQL / Legacy Planner
Reporter: Timo Walther
Assignee: Timo Walther


Remove references to {{flink-table-planner}} in the Kafka modules.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22823) Add Basic and SPNEGO authentication support

2021-06-01 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-22823:
-

 Summary: Add Basic and SPNEGO authentication support
 Key: FLINK-22823
 URL: https://issues.apache.org/jira/browse/FLINK-22823
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / REST, Runtime / Web Frontend
Affects Versions: 1.13.1
Reporter: Gabor Somogyi


The Flink Dashboard and HistoryServer lacks any kind of authentication but 
enterprise users are expecting that.
 The intention is to add:
 * Basic authentication for simple/less sensitive use-cases
 * SPNEGO authentication for highly sensitive use-cases



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: How to use or configure flink checkpointing with siddhi internal state

2021-06-01 Thread Till Rohrmann
Hi Dipanjan,

I am assuming that you are using the flink-siddhi library [1]. I am not an
expert but it looks as if the AbstractSiddhiOperator overrides the
snapshotState [2] method to store the Siddhi state in Flink.

[1] https://github.com/haoch/flink-siddhi
[2]
https://github.com/haoch/flink-siddhi/blob/master/core/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java#L331

Cheers,
Till

On Mon, May 31, 2021 at 7:27 PM Dipanjan Mazumder 
wrote:

> Hi ,
>I was trying to do checkpointing while using siddhi as the CEP engine
> running on flink. While using siddhi windowing , it uses an internal state
> to aggregated or perform operation on a bucket of events pertaining to a
> specific time window. But what i am not sure is how can that state be
> mapped to Flinks internal state so that i can use flink checkpointing to
> safeguard the internal state of the siddhi operators in the face of failure.
> Any help or pointer for this will be of great help to me.Thanks in advance.
> Dipanjan -


Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-01 Thread Till Rohrmann
Hi Gabor, welcome to the Flink community!

Thanks for sharing this proposal with the community Márton. In general, I
agree that authentication is missing and that this is required for using
Flink within an enterprise. The thing I am wondering is whether this
feature strictly needs to be implemented inside of Flink or whether a proxy
setup could do the job? Have you considered this option? If yes, then it
would be good to list it under the point of rejected alternatives.

I do see the benefit of implementing this feature inside of Flink if many
users need it. If not, then it might be easier for the project to not
increase the surface area since it makes the overall maintenance harder.

Cheers,
Till

On Mon, May 31, 2021 at 4:57 PM Márton Balassi  wrote:

> Hi team,
>
> Firstly I would like to introduce Gabor or G [1] for short to the
> community, he is a Spark committer who has recently transitioned to the
> Flink Engineering team at Cloudera and is looking forward to contributing
> to Apache Flink. Previously G primarily focused on Spark Streaming and
> security.
>
> Based on requests from our customers G has implemented Kerberos and HTTP
> Basic Authentication for the Flink Dashboard and HistoryServer. Previously
> lacked an authentication story.
>
> We are looking to contribute this functionality back to the community, we
> believe that given Flink's maturity there should be a common code solution
> for this general pattern.
>
> We are looking forward to your feedback on G's design. [2]
>
> [1] http://gaborsomogyi.com/
> [2]
>
> https://docs.google.com/document/d/1NMPeJ9H0G49TGy3AzTVVJVKmYC0okwOtqLTSPnGqzHw/edit
>


[jira] [Created] (FLINK-22822) Drop usages of legacy planner in JDBC module

2021-06-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-22822:


 Summary: Drop usages of legacy planner in JDBC module
 Key: FLINK-22822
 URL: https://issues.apache.org/jira/browse/FLINK-22822
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / JDBC, Table SQL / Legacy Planner
Reporter: Timo Walther
Assignee: Timo Walther


Remove references to {{flink-table-planner}} in the JDBC module.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22821) FlinkKafkaProducerMigrationTest fails with "Address already in use"

2021-06-01 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22821:


 Summary: FlinkKafkaProducerMigrationTest fails with "Address 
already in use"
 Key: FLINK-22821
 URL: https://issues.apache.org/jira/browse/FLINK-22821
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.0
Reporter: Dawid Wysakowicz
 Fix For: 1.14.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18469=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6832

{code}
Jun 01 01:27:33 java.net.BindException: Address already in use
Jun 01 01:27:33 at sun.nio.ch.Net.bind0(Native Method)
Jun 01 01:27:33 at sun.nio.ch.Net.bind(Net.java:461)
Jun 01 01:27:33 at sun.nio.ch.Net.bind(Net.java:453)
Jun 01 01:27:33 at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:222)
Jun 01 01:27:33 at 
sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85)
Jun 01 01:27:33 at 
sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:78)
Jun 01 01:27:33 at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:90)
Jun 01 01:27:33 at 
org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:120)
Jun 01 01:27:33 at 
org.apache.curator.test.TestingZooKeeperMain.runFromConfig(TestingZooKeeperMain.java:93)
Jun 01 01:27:33 at 
org.apache.curator.test.TestingZooKeeperServer$1.run(TestingZooKeeperServer.java:148)
Jun 01 01:27:33 at java.lang.Thread.run(Thread.java:748)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22820) Stopping Yarn session cluster will cause fatal error

2021-06-01 Thread Yang Wang (Jira)
Yang Wang created FLINK-22820:
-

 Summary: Stopping Yarn session cluster will cause fatal error
 Key: FLINK-22820
 URL: https://issues.apache.org/jira/browse/FLINK-22820
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.1, 1.14.0
Reporter: Yang Wang
 Attachments: log.jm

Stopping the Yarn session cluster via {{echo "stop" | ./bin/yarn-session.sh -id 
application_}} will have the following fatal error. A full jobmanager log 
is also attached.


{code:java}
2021-06-01 15:37:38,005 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL: Thread 
'flink-akka.actor.default-dispatcher-4' produced an uncaught exception. 
Stopping the process...
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: 
The job State machine job(2fab9b478eb86deade69c613fe0ab58b) is not in a 
globally terminal state. Instead it is in state SUSPENDED.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:824) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 ~[?:1.8.0_102]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: java.lang.IllegalArgumentException: The job State machine 
job(2fab9b478eb86deade69c613fe0ab58b) is not in a globally terminal state. 
Instead it is in state SUSPENDED.
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore.put(FileExecutionGraphInfoStore.java:168)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:845)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedTerminalState(Dispatcher.java:836)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:443)
 

[jira] [Created] (FLINK-22819) YARNFileReplicationITCase fails with "The YARN application unexpectedly switched to state FAILED during deployment"

2021-06-01 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22819:


 Summary: YARNFileReplicationITCase fails with "The YARN 
application unexpectedly switched to state FAILED during deployment"
 Key: FLINK-22819
 URL: https://issues.apache.org/jira/browse/FLINK-22819
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.13.1
Reporter: Dawid Wysakowicz
 Fix For: 1.14.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18467=logs=8fd975ef-f478-511d-4997-6f15fe8a1fd3=ac0fa443-5d45-5a6b-3597-0310ecc1d2ab=32007

{code}
May 31 23:14:22 org.apache.flink.client.deployment.ClusterDeploymentException: 
Could not deploy Yarn job cluster.
May 31 23:14:22 at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
May 31 23:14:22 at 
org.apache.flink.yarn.YARNFileReplicationITCase.deployPerJob(YARNFileReplicationITCase.java:106)
May 31 23:14:22 at 
org.apache.flink.yarn.YARNFileReplicationITCase.lambda$testPerJobModeWithDefaultFileReplication$1(YARNFileReplicationITCase.java:78)
May 31 23:14:22 at 
org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:287)
May 31 23:14:22 at 
org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithDefaultFileReplication(YARNFileReplicationITCase.java:78)
May 31 23:14:22 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
May 31 23:14:22 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
May 31 23:14:22 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
May 31 23:14:22 at java.lang.reflect.Method.invoke(Method.java:498)
May 31 23:14:22 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
May 31 23:14:22 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
May 31 23:14:22 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
May 31 23:14:22 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
May 31 23:14:22 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
May 31 23:14:22 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
May 31 23:14:22 at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
May 31 23:14:22 at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
May 31 23:14:22 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
May 31 23:14:22 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
May 31 23:14:22 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
May 31 23:14:22 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
May 31 23:14:22 at 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
May 31 23:14:22 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
May 31 23:14:22 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
May 31 23:14:22 at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
May 31 23:14:22 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
May 31 23:14:22 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
May 31 23:14:22 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
May 31 23:14:22 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
May 31 23:14:22 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
May 31 23:14:22 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
May 31 23:14:22 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
May 31 23:14:22 at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
May 31 23:14:22 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
May 31 23:14:22 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
May 31 23:14:22 at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
May 31 23:14:22 at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
May 31 23:14:22 at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
May 31 23:14:22 at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
May 31 23:14:22 at 

[jira] [Created] (FLINK-22818) IgnoreInFlightDataITCase fails on azure

2021-06-01 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22818:


 Summary: IgnoreInFlightDataITCase fails on azure
 Key: FLINK-22818
 URL: https://issues.apache.org/jira/browse/FLINK-22818
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Dawid Wysakowicz
 Fix For: 1.14.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18465=logs=a549b384-c55a-52c0-c451-00e0477ab6db=81f2da51-a161-54c7-5b84-6001fed26530=9807

{code}
May 31 22:28:49 [ERROR] Failures: 
May 31 22:28:49 [ERROR]   
IgnoreInFlightDataITCase.testIgnoreInFlightDataDuringRecovery:101 
May 31 22:28:49 Expected: a value less than <57464560L>
May 31 22:28:49  but: <57464560L> was equal to <57464560L>
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22817) About flink1.13 hive integration Kafka SQL

2021-06-01 Thread xingyuan cheng (Jira)
xingyuan cheng created FLINK-22817:
--

 Summary: About flink1.13 hive integration Kafka SQL 
 Key: FLINK-22817
 URL: https://issues.apache.org/jira/browse/FLINK-22817
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Affects Versions: 1.13.1
 Environment: flink: 1.13.1

flink calcite: 1.26.1

 

kafka-eagle: 2.0.5

kafka-eagle calcite 1.21.0
Reporter: xingyuan cheng
 Attachments: hive-2021-06-01-2.png, hive-2021-06-01-3.png, 
hive-2021-06-01.png

hello, I observe the community’s proposal
Flip-152: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility
We can know that



```

CREATE CATALOG myhive WITH (
'Type' ='hive',
'Default-database' ='default',
'Hive-conf-dir' ='/data/hive/conf/'
);

To

USE CATALOG myhive;

Set table.sql-dialect=hive;

```

When specifying the sql-dialect type, it will follow the SPI from the 
configuration file

The org.apache.flink.table.factories.TableFactory file under 
flink-connector-hive is assigned to HiveParserFactory to parse the 
corresponding grammar, and HiveParser is the corresponding grammar parser

And perform grammatical analysis in HiveParserFactory#create



During the course of investigating kafka-eagle, I found

Among them, KSQL is also based on calcite for grammatical analysis, and can 
support DDL and DML of kafka tables

Test-related classes are in: KSqlParser#TestKSqlParser

And completed the analysis of the corresponding grammar in 
KsqlParser#parseQueryKSql

Does the community have any good suggestions for this proposal?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)