Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Thomas Wang
Ingo,

It looks like I'm now seeing "Caused by: java.lang.NullPointerException:
You must specify a value for roleArn and roleSessionName". I assume I would
also need to specify that through the configuration file. Could you suggest
the key for this configuration? Thanks.

Thomas

On Sat, Sep 25, 2021 at 7:25 PM Thomas Wang  wrote:

> Thanks Ingo. Adding the following setting worked.
>
> fs.s3a.aws.credentials.provider:
> com.amazonaws.auth.WebIdentityTokenCredentialsProvider
>
> Thomas
>
> On Sat, Sep 25, 2021 at 1:12 PM Ingo Bürk  wrote:
>
>> Hi Thomas,
>>
>> I think you might be looking for this:
>> https://github.com/apache/flink/pull/16717
>>
>>
>> Best
>> Ingo
>>
>> On Sat, Sep 25, 2021, 20:46 Thomas Wang  wrote:
>>
>>> Hi,
>>>
>>> I'm using the official docker image:
>>> apache/flink:1.12.1-scala_2.11-java11
>>>
>>> I'm trying to run a Flink job on an EKS cluster. The job is running
>>> under a k8s service account that is tied to an IAM role. If I'm not using
>>> s3 as RocksDB checkpoint backend, everything works just fine. However, when
>>> I enabled s3 as RocksDB checkpoint backend, I got permission denied.
>>>
>>> The IAM role tied to the service account has the appropriate permissions
>>> to s3. However the underlying role tied to the EKS node doesn't. After
>>> debugging with AWS support, it looks like the request to s3 was made under
>>> the EKS node role, not the role tied to the service account. Thus the
>>> permission denial.
>>>
>>> With the same Flink application, I'm also making requests to AWS Secrets
>>> Manager to get some sensitive information and those requests were made
>>> explicitly with AWS Java SDK 2.x bundled in the same application Jar file.
>>> Those requests were made correctly with the IAM role tied to the service
>>> account.
>>>
>>> Based on the info above, I suspect Flink may be using an older version
>>> of the AWS SDK that doesn't support assuming an IAM role via an IODC web
>>> identity token file. Please see AWS doc here:
>>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>>>
>>> Could someone help me confirm this bug and maybe have it fixed some
>>> time? Thanks.
>>>
>>> Thomas
>>>
>>


????: ?????? flink sql????????????????sink table?

2021-09-25 Thread wukon...@foxmail.com
hi : 
UDF ??SQL ?? 
?? topic ??
?? https://mp.weixin.qq.com/s/IKzCRTh8eamk6TX7_RHYdQ



wukon...@foxmail.com
 
 JasonLee
?? 2021-09-23 21:56
 user-zh@flink.apache.org
?? ?? flink sqlsink table?
Hi
 
 
,  SQL  SQL 
??,??
 
 
Best
JasonLee
 
 
??2021??09??23?? 09:28 ??
sql??sql??
 
iPhone
 
 
--  --
??: 2572805166 <2572805...@qq.com.INVALID
: 2021??9??23?? 09:23
??: user-zh https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like
 casel.chen ??2021??9??18?? 8:27?? kafka 
topic??topic)
 ??flink sqlsink table


Re: Write Streaming data to S3 in Parquet files

2021-09-25 Thread Guowei Ma
Hi, Harshvardhan

I think CaiZhi is right.
I only have a small addition. Because I see that you want to convert Table
to DataStream, you can look at FileSink (ParquetWriterFactory)[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/file_sink/#bulk-encoded-formats

Best,
Guowei


On Sun, Sep 26, 2021 at 10:31 AM Caizhi Weng  wrote:

> Hi!
>
> Try the PARTITIONED BY clause. See
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/
>
> Harshvardhan Shinde  于2021年9月24日周五
> 下午5:52写道:
>
>> Hi,
>> I wanted to know if we can write streaming data to S3 in parquet format
>> with partitioning.
>> Here's what I want to achieve:
>> I have a kafka table which gets updated with the data from kafka topic
>> and I'm using select statement to get the data into a Table and converting
>> into a stream as:
>>
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>> Table table = tableEnv.sqlQuery("Select * from test");
>> DataStream stream = tableEnv.toDataStream(table);
>>
>> Now I want to write this stream to S3 in parquet files with hourly
>> partitions.
>>
>> Here are my questions:
>> 1. Is this possible?
>> 2. If yes, how it can be achieved or link to appropriate documentation.
>>
>> Thanks and Regards,
>> Harshvardhan
>>
>>


Re: Job Manager went down on cancelling job with savepoint

2021-09-25 Thread Guowei Ma
Hi, Puneet

Could you share whether you are using Flink's session mode or application
mode?
>From the log, you are using `StandaloneDispatcher`, but you will use it in
both session and application mode.
If you use application mode, this might be in line with expectations.

Best,
Guowei


On Fri, Sep 24, 2021 at 9:19 PM Puneet Duggal 
wrote:

> Hi,
>
> So while cancelling one job with savepoint… even though job got cancelled
> successfully .. but somehow immediately after that job manager went down.
> Not able to deduce anything from given stack trace.. Any help is appreciated
>
> 2021-09-24 11:50:44,182 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping
> checkpoint coordinator for job 1f764a51996d206b28721aa4a1420bea.
> 2021-09-24 11:50:44,182 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Shutting down
> 2021-09-24 11:50:44,240 INFO
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore [] - Removing
> /flink/default_ns/checkpoints/1f764a51996d206b28721aa4a1420bea from
> ZooKeeper
> 2021-09-24 11:50:44,243 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter [] -
> Shutting down.
> 2021-09-24 11:50:44,243 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter [] -
> Removing /checkpoint-counter/1f764a51996d206b28721aa4a1420bea from ZooKeeper
> 2021-09-24 11:50:44,249 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
> 1f764a51996d206b28721aa4a1420bea reached globally terminal state CANCELED.
> 2021-09-24 11:50:44,249 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
> Thread 'cluster-io-thread-16' produced an uncaught exception. Stopping the
> process...
> java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@54a5137c
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@37ee0790[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 4513]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
> ~[?:1.8.0_232]
> at
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_232]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
>
> Regards,
> Puneet
>
>
>


??????flink-1.12.0(1.13.2????????????) select datas[1].filed_1,datas[1].filed_2????????????????????

2021-09-25 Thread kcz
soryy ??




----
??: 
   "kcz"
<573693...@qq.com;
:2021??9??26??(??) 10:53
??:"user-zh"

flink-1.12.0(1.13.2????????????) select datas[1].filed_1,datas[1].filed_2????????????????????

2021-09-25 Thread kcz
??INDEX??INDEX++ 
??valuearray
CREATE TABLE KafkaTable (
 datas array

Re: Write Streaming data to S3 in Parquet files

2021-09-25 Thread Caizhi Weng
Hi!

Try the PARTITIONED BY clause. See
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/

Harshvardhan Shinde  于2021年9月24日周五
下午5:52写道:

> Hi,
> I wanted to know if we can write streaming data to S3 in parquet format
> with partitioning.
> Here's what I want to achieve:
> I have a kafka table which gets updated with the data from kafka topic and
> I'm using select statement to get the data into a Table and converting into
> a stream as:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> Table table = tableEnv.sqlQuery("Select * from test");
> DataStream stream = tableEnv.toDataStream(table);
>
> Now I want to write this stream to S3 in parquet files with hourly
> partitions.
>
> Here are my questions:
> 1. Is this possible?
> 2. If yes, how it can be achieved or link to appropriate documentation.
>
> Thanks and Regards,
> Harshvardhan
>
>


Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Thomas Wang
Thanks Ingo. Adding the following setting worked.

fs.s3a.aws.credentials.provider:
com.amazonaws.auth.WebIdentityTokenCredentialsProvider

Thomas

On Sat, Sep 25, 2021 at 1:12 PM Ingo Bürk  wrote:

> Hi Thomas,
>
> I think you might be looking for this:
> https://github.com/apache/flink/pull/16717
>
>
> Best
> Ingo
>
> On Sat, Sep 25, 2021, 20:46 Thomas Wang  wrote:
>
>> Hi,
>>
>> I'm using the official docker image:
>> apache/flink:1.12.1-scala_2.11-java11
>>
>> I'm trying to run a Flink job on an EKS cluster. The job is running under
>> a k8s service account that is tied to an IAM role. If I'm not using s3 as
>> RocksDB checkpoint backend, everything works just fine. However, when I
>> enabled s3 as RocksDB checkpoint backend, I got permission denied.
>>
>> The IAM role tied to the service account has the appropriate permissions
>> to s3. However the underlying role tied to the EKS node doesn't. After
>> debugging with AWS support, it looks like the request to s3 was made under
>> the EKS node role, not the role tied to the service account. Thus the
>> permission denial.
>>
>> With the same Flink application, I'm also making requests to AWS Secrets
>> Manager to get some sensitive information and those requests were made
>> explicitly with AWS Java SDK 2.x bundled in the same application Jar file.
>> Those requests were made correctly with the IAM role tied to the service
>> account.
>>
>> Based on the info above, I suspect Flink may be using an older version of
>> the AWS SDK that doesn't support assuming an IAM role via an IODC web
>> identity token file. Please see AWS doc here:
>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>>
>> Could someone help me confirm this bug and maybe have it fixed some time?
>> Thanks.
>>
>> Thomas
>>
>


Re: flink-1.12.0 ddl设置watermark error,但是1.13.2没有报错

2021-09-25 Thread Leonard Xu
这是个已知bug[1], 在1.13.0 和 1.12.3上都修复了, 你可以用下flink 1.12.5 或 1.13.2的小版本

[1]https://issues.apache.org/jira/browse/FLINK-22082

祝好

> 在 2021年9月25日,21:29,kcz <573693...@qq.com.INVALID> 写道:
> 
> SQL定义如下,当1.12.0将watermark语句移除之后,就不报错了。
> CREATE TABLE KafkaTable (
>  test array  gatherTime STRING,
>  log_ts as TO_TIMESTAMP(FROM_UNIXTIME(CAST(gatherTime AS 
> bigint)/1000,'-MM-dd HH:mm:ss')),
>  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'json'
> );
> 
> SELECT test[1].signalValue from KafkaTable;
> 
> 
> 
> 
> Exception in thread "main" scala.MatchError: ITEM($0, 1) (of class 
> org.apache.calcite.rex.RexCall)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
>   at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:127)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:62)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>   at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> 

Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Dhiru
 We need to overwrite using   
WebIdentityTokenFileCredentialsProviderhttps://github.com/aws/aws-sdk-java-v2/issues/1470#issuecomment-543601232.
 otherwise java takes presidency to secret key and access keys than SA

On Saturday, September 25, 2021, 04:37:22 PM EDT, Xiangyu Su 
 wrote:  
 
 Hi Thomas,did you try to login to EKS node and run some aws command like : aws 
s3 ls ?It sounds like EKS issue, but not 100% sure.Best

On Sat, 25 Sept 2021 at 22:12, Ingo Bürk  wrote:

Hi Thomas,
I think you might be looking for this: 
https://github.com/apache/flink/pull/16717

BestIngo
On Sat, Sep 25, 2021, 20:46 Thomas Wang  wrote:

Hi,
I'm using the official docker image: apache/flink:1.12.1-scala_2.11-java11
I'm trying to run a Flink job on an EKS cluster. The job is running under a k8s 
service account that is tied to an IAM role. If I'm not using s3 as RocksDB 
checkpoint backend, everything works just fine. However, when I enabled s3 as 
RocksDB checkpoint backend, I got permission denied.
The IAM role tied to the service account has the appropriate permissions to s3. 
However the underlying role tied to the EKS node doesn't. After debugging with 
AWS support, it looks like the request to s3 was made under the EKS node role, 
not the role tied to the service account. Thus the permission denial.
With the same Flink application, I'm also making requests to AWS Secrets 
Manager to get some sensitive information and those requests were made 
explicitly with AWS Java SDK 2.x bundled in the same application Jar file. 
Those requests were made correctly with the IAM role tied to the service 
account.
Based on the info above, I suspect Flink may be using an older version of the 
AWS SDK that doesn't support assuming an IAM role via an IODC web identity 
token file. Please see AWS doc here: 
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
Could someone help me confirm this bug and maybe have it fixed some time? 
Thanks.
Thomas



-- 
Xiangyu Su
Java Developer
xian...@smaato.com

Smaato Inc.
San Francisco - New York - Hamburg - Singapore
www.smaato.com

Germany:

Barcastraße 5

22087 Hamburg

Germany
M 0049(176)43330282

The information contained in this communication may be CONFIDENTIAL and is 
intended only for the use of the recipient(s) named above. If you are not the 
intended recipient, you are hereby notified that any dissemination, 
distribution, or copying of this communication, or any of its contents, is 
strictly prohibited. If you have received this communication in error, please 
notify the sender and delete/destroy the original message and any copy of it 
from your computer or paper files.
  

Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Xiangyu Su
Hi Thomas,
did you try to login to EKS node and run some aws command like : aws s3 ls
?
It sounds like EKS issue, but not 100% sure.
Best


On Sat, 25 Sept 2021 at 22:12, Ingo Bürk  wrote:

> Hi Thomas,
>
> I think you might be looking for this:
> https://github.com/apache/flink/pull/16717
>
>
> Best
> Ingo
>
> On Sat, Sep 25, 2021, 20:46 Thomas Wang  wrote:
>
>> Hi,
>>
>> I'm using the official docker image:
>> apache/flink:1.12.1-scala_2.11-java11
>>
>> I'm trying to run a Flink job on an EKS cluster. The job is running under
>> a k8s service account that is tied to an IAM role. If I'm not using s3 as
>> RocksDB checkpoint backend, everything works just fine. However, when I
>> enabled s3 as RocksDB checkpoint backend, I got permission denied.
>>
>> The IAM role tied to the service account has the appropriate permissions
>> to s3. However the underlying role tied to the EKS node doesn't. After
>> debugging with AWS support, it looks like the request to s3 was made under
>> the EKS node role, not the role tied to the service account. Thus the
>> permission denial.
>>
>> With the same Flink application, I'm also making requests to AWS Secrets
>> Manager to get some sensitive information and those requests were made
>> explicitly with AWS Java SDK 2.x bundled in the same application Jar file.
>> Those requests were made correctly with the IAM role tied to the service
>> account.
>>
>> Based on the info above, I suspect Flink may be using an older version of
>> the AWS SDK that doesn't support assuming an IAM role via an IODC web
>> identity token file. Please see AWS doc here:
>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>>
>> Could someone help me confirm this bug and maybe have it fixed some time?
>> Thanks.
>>
>> Thomas
>>
>

-- 
Xiangyu Su
Java Developer
xian...@smaato.com

Smaato Inc.
San Francisco - New York - Hamburg - Singapore
www.smaato.com

Germany:

Barcastraße 5

22087 Hamburg

Germany
M 0049(176)43330282

The information contained in this communication may be CONFIDENTIAL and is
intended only for the use of the recipient(s) named above. If you are not
the intended recipient, you are hereby notified that any dissemination,
distribution, or copying of this communication, or any of its contents, is
strictly prohibited. If you have received this communication in error,
please notify the sender and delete/destroy the original message and any
copy of it from your computer or paper files.


Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Ingo Bürk
Hi Thomas,

I think you might be looking for this:
https://github.com/apache/flink/pull/16717


Best
Ingo

On Sat, Sep 25, 2021, 20:46 Thomas Wang  wrote:

> Hi,
>
> I'm using the official docker image: apache/flink:1.12.1-scala_2.11-java11
>
> I'm trying to run a Flink job on an EKS cluster. The job is running under
> a k8s service account that is tied to an IAM role. If I'm not using s3 as
> RocksDB checkpoint backend, everything works just fine. However, when I
> enabled s3 as RocksDB checkpoint backend, I got permission denied.
>
> The IAM role tied to the service account has the appropriate permissions
> to s3. However the underlying role tied to the EKS node doesn't. After
> debugging with AWS support, it looks like the request to s3 was made under
> the EKS node role, not the role tied to the service account. Thus the
> permission denial.
>
> With the same Flink application, I'm also making requests to AWS Secrets
> Manager to get some sensitive information and those requests were made
> explicitly with AWS Java SDK 2.x bundled in the same application Jar file.
> Those requests were made correctly with the IAM role tied to the service
> account.
>
> Based on the info above, I suspect Flink may be using an older version of
> the AWS SDK that doesn't support assuming an IAM role via an IODC web
> identity token file. Please see AWS doc here:
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>
> Could someone help me confirm this bug and maybe have it fixed some time?
> Thanks.
>
> Thomas
>


Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Thomas Wang
Hi,

I'm using the official docker image: apache/flink:1.12.1-scala_2.11-java11

I'm trying to run a Flink job on an EKS cluster. The job is running under a
k8s service account that is tied to an IAM role. If I'm not using s3 as
RocksDB checkpoint backend, everything works just fine. However, when I
enabled s3 as RocksDB checkpoint backend, I got permission denied.

The IAM role tied to the service account has the appropriate permissions to
s3. However the underlying role tied to the EKS node doesn't. After
debugging with AWS support, it looks like the request to s3 was made under
the EKS node role, not the role tied to the service account. Thus the
permission denial.

With the same Flink application, I'm also making requests to AWS Secrets
Manager to get some sensitive information and those requests were made
explicitly with AWS Java SDK 2.x bundled in the same application Jar file.
Those requests were made correctly with the IAM role tied to the service
account.

Based on the info above, I suspect Flink may be using an older version of
the AWS SDK that doesn't support assuming an IAM role via an IODC web
identity token file. Please see AWS doc here:
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html

Could someone help me confirm this bug and maybe have it fixed some time?
Thanks.

Thomas


flink-1.12.0 ddl????watermark error??????1.13.2????????

2021-09-25 Thread kcz
SQL1.12.0??watermark??
CREATE TABLE KafkaTable (
 test array

Re: how to run job and make jobmanager HA

2021-09-25 Thread houssem


1)You can use the application cluster mode you can find how to configure in the 
official flink documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster

2)for HA you can use kubernetes HA:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html

Best,

On 2021/09/24 22:58:48, Dhiru  wrote: 
>  
> please let me know if anyone can help me on this On Friday, September 24, 
> 2021, 01:45:39 PM EDT, Dhiru  wrote:  
>  
>  spec:  replicas: 1  selector:    matchLabels:      app: flink      
> component: jobmanager  template:    metadata:      labels:        app: flink  
>       component: jobmanager    spec:      serviceAccountName: 
> msc-s3-shared-content      containers:      - name: jobmanager        image: 
> test:latest  ( flink:1.11.3-scala_2.12-java11 image has 
> DeliveryStreams-0.0.1_3.1.0.jar jar copied to ./bin/flink)         args: 
> ["jobmanager"]        command: ['./bin/flink', 'run', 
> './bin/DeliveryStreams-0.0.1_3.1.0.jar', 'DeduplicationJob'] .  ( I am 
> planning to run job ... Please let me know if this is right way)        
> ports:        - containerPort: 6123          name: rpc        - 
> containerPort: 6124          name: blob-server        - containerPort: 8081   
>        name: webui        env:        - name: JOB_MANAGER_RPC_ADDRESS         
>  value: flink-jobmanager        - name: KAFKA_BROKERS       
    value: kafka:29092        livenessProbe:          tcpSocket:            
port: 6123          initialDelaySeconds: 30          periodSeconds: 60        
volumeMounts:        - name: flink-config-volume          mountPath: 
/opt/flink/conf        securityContext:          runAsUser:   # refers to 
user _flink_ from official flink image, change if necessary      volumes:      
- name: flink-config-volume        configMap:          name: flink-config       
   items:          - key: flink-conf.yaml            path: flink-conf.yaml      
    - key: log4j-console.properties            path: 
log4j-console.properties"msc-jobmanager-deployment.yaml" 54L, 1640C  
> a) Do not want to run my job from UI , want to run my job  from jobmanager 
> image during  boot time , when I am trying to run its throwing me error b) 
> How to make sure my jobManager and taskmanager is HA configured ( so that if 
> jobmanager goes off do not loose data)
> Thanks Kumar
> 
>