Hi Xuyang,
thanks for the reply.
I've listed a complete example below. I have a working flink cluster running in
AWS and accepting connections over https.
I think I'm probably just very confused about how this is meant to work.
What I expected:
- The executeSql statements to be executed remotely on the flink cluster.
- The final executeSql to create a stream in the remote flink cluster,
which:
- when parquet files that are landed in s3://mybucket/parquet (eg
s3://mybucket/parquet/in.parquet)
- will append the parquet to the iceberg table `out`
What I see:
- All sql executed locally
- A stream created in the remote flink cluster
- after it appears to have executed the sql locally
- that looks like it has created a connection from `in` to `out`
but doesn't process records
If I just create the `in` table below and stream to print() I do see rows print
locally in the console.
List below.
Thanks
Dan
public class Example {
public static void main(String[] args) throws Exception {
org.apache.flink.configuration.Configuration configuration = new
org.apache.flink.configuration.Configuration();
configuration.setBoolean(SSL_REST_ENABLED, true);
try (StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(
"myflinkcluster.example.com",
443,
configuration
)) {
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.inStreamingMode()
.build());
var catalogName = "foo";
var dbName = "baa";
Configuration hadoopConf = clusterHadoopConf();
hadoopConf.set("hive.vectorized.execution.enabled", "false");
tEnv.registerCatalog(catalogName, new
org.apache.iceberg.flink.FlinkCatalog(
catalogName,
"default",
Namespace.empty(),
CatalogLoader.custom(catalogName,
Map.of(
"io-impl",
"org.apache.iceberg.aws.s3.S3FileIO",
"s3.sse.type", "kms",
"catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog",
"glue.skip-name-validation", "true",
"type", "iceberg",
"warehouse", "s3://mybucket/warehouse",
"lock.table", "my-dynamo-table-data-lock",
"lock-impl",
"org.apache.iceberg.aws.dynamodb.DynamoDbLockManager"
),
hadoopConf,
"org.apache.iceberg.aws.glue.GlueCatalog"
),
true,
-1)
);
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS
`%s`.`%s`;".formatted(catalogName, dbName));
tEnv.executeSql("DROP TABLE IF EXISTS `in`;");
tEnv.executeSql("""
CREATE TABLE `in`(
`a` DOUBLE,
`b` DOUBLE,
`c` DOUBLE,
`d` STRING
) WITH (
'connector' = 'filesystem',
'path' = 's3://mybucket/parquet/',
'format' = 'parquet',
'source.monitor-interval' = '5s'
);
""");
tEnv.executeSql("""
CREATE TABLE IF NOT EXISTS `%s`.`%s`.`out`(
`a` DOUBLE,
`b` DOUBLE,
`c` DOUBLE,
`d` STRING
);
""".formatted(catalogName, dbName));
tEnv.executeSql("""
INSERT INTO `%s`.`%s`.`out`
SELECT `a`, `b`, `c`, `d`
FROM `in`;
""".formatted(catalogName, dbName));
env.executeAsync("IceBerger " + LocalDateTime.now());
}
}
}
________________________________
From: Xuyang <[email protected]>
Sent: 28 November 2023 03:02
To: Oxlade, Dan <[email protected]>
Cc: [email protected] <[email protected]>
Subject: [EXTERNAL] Re:Flink SQL and createRemoteEnvironment
Hi, Dan.
Can you provide more details?
> I'm seeing unexpected behavior where it appears like the sql is executed
> locally.
Did you find a minicluster started locally running you program?
> In my case the remote environment is inside AWS and it doesn't appear to pick
> up the region and credentials unless I set the environment variables locally
I think you need make sure your local machin can connect to AWS environment
firstly.
Overall, I think `StreamExecutionEnvironment#createRemoteEnvironment ` can meet
your requirements.
--
Best!
Xuyang
At 2023-11-28 03:49:44, "Oxlade, Dan" <[email protected]> wrote:
Hi,
If I use StreamExecutionEnvironment.createRemoteEnvironment and then
var tEnv = StreamTableEnvironment.create(env) from the resulting remote
StreamExecutionEvironment will any sql executed using tEnv.executeSql be
executed remotely inside the flink cluster?
I'm seeing unexpected behavior where it appears like the sql is executed
locally. In my case the remote environment is inside AWS and it doesn't appear
to pick up the region and credentials unless I set the environment variables
locally - I want the job to run inside the cluster and use the remote AWS
context, including things like the auth credentials of the AWS task.
I feel like I might be fundamentally misunderstanding.
Thanks
Dan
T. Rowe Price International Ltd (registered number 3957748) is registered in
England and Wales with its registered office at Warwick Court, 5 Paternoster
Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and
regulated by the Financial Conduct Authority. The company has a branch in Dubai
International Financial Centre (regulated by the DFSA as a Representative
Office).
T. Rowe Price (including T. Rowe Price International Ltd and its affiliates)
and its associates do not provide legal or tax advice. Any tax-related
discussion contained in this e-mail, including any attachments, is not intended
or written to be used, and cannot be used, for the purpose of (i) avoiding any
tax penalties or (ii) promoting, marketing, or recommending to any other party
any transaction or matter addressed herein. Please consult your independent
legal counsel and/or professional tax advisor regarding any legal or tax issues
raised in this e-mail.
The contents of this e-mail and any attachments are intended solely for the use
of the named addressee(s) and may contain confidential and/or privileged
information. Any unauthorized use, copying, disclosure, or distribution of the
contents of this e-mail is strictly prohibited by the sender and may be
unlawful. If you are not the intended recipient, please notify the sender
immediately and delete this e-mail.
T. Rowe Price International Ltd (registered number 3957748) is registered in
England and Wales with its registered office at Warwick Court, 5 Paternoster
Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and
regulated by the Financial Conduct Authority. The company has a branch in Dubai
International Financial Centre (regulated by the DFSA as a Representative
Office).
T. Rowe Price (including T. Rowe Price International Ltd and its affiliates)
and its associates do not provide legal or tax advice. Any tax-related
discussion contained in this e-mail, including any attachments, is not intended
or written to be used, and cannot be used, for the purpose of (i) avoiding any
tax penalties or (ii) promoting, marketing, or recommending to any other party
any transaction or matter addressed herein. Please consult your independent
legal counsel and/or professional tax advisor regarding any legal or tax issues
raised in this e-mail.
The contents of this e-mail and any attachments are intended solely for the use
of the named addressee(s) and may contain confidential and/or privileged
information. Any unauthorized use, copying, disclosure, or distribution of the
contents of this e-mail is strictly prohibited by the sender and may be
unlawful. If you are not the intended recipient, please notify the sender
immediately and delete this e-mail.