Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题
thanks Tony Wei 于2021年11月2日周二 下午1:12写道: > Hi yidan, > > 你可以試試 SQL Hints [1]. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/ > > > yidan zhao 於 2021年11月2日 週二 下午1:03寫道: > > > 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。 > > 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在 > > flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。 > > 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。 > > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > > 'sink.partition-commit.trigger'='partition-time', > > > 'sink.partition-commit.delay'='1 h', > > > 'sink.partition-commit.policy.kind'='metastore,success-file'); > > > > Caizhi Weng 于2021年11月2日周二 上午10:47写道: > > > > > Hi! > > > > > > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink > 就会自动读取 > > > hive 里表的结构等信息。但 kafka 的表定义仍然要写。 > > > > > > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。 > > > > > > yidan zhao 于2021年11月1日周一 下午3:05写道: > > > > > > > 如题,我看了官方文档,定义好kafka和hive表。 > > > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 > > > > > > > > SET table.sql-dialect=hive;CREATE TABLE hive_table ( > > > > user_id STRING, > > > > order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED > AS > > > > parquet TBLPROPERTIES ( > > > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > > > 'sink.partition-commit.trigger'='partition-time', > > > > 'sink.partition-commit.delay'='1 h', > > > > 'sink.partition-commit.policy.kind'='metastore,success-file'); > > > > SET table.sql-dialect=default;CREATE TABLE kafka_table ( > > > > user_id STRING, > > > > order_amount DOUBLE, > > > > log_ts TIMESTAMP(3), > > > > WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define > > > > watermark on TIMESTAMP column) WITH (...); > > > > > > > > > > > > > > > > > > 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 > > > > > > > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 > > > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 > > > > > > > > > > > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。 > > > > > > > > > >
Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题
Hi yidan, 你可以試試 SQL Hints [1]. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/ yidan zhao 於 2021年11月2日 週二 下午1:03寫道: > 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。 > 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在 > flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。 > 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。 > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > 'sink.partition-commit.trigger'='partition-time', > > 'sink.partition-commit.delay'='1 h', > > 'sink.partition-commit.policy.kind'='metastore,success-file'); > > Caizhi Weng 于2021年11月2日周二 上午10:47写道: > > > Hi! > > > > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取 > > hive 里表的结构等信息。但 kafka 的表定义仍然要写。 > > > > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。 > > > > yidan zhao 于2021年11月1日周一 下午3:05写道: > > > > > 如题,我看了官方文档,定义好kafka和hive表。 > > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 > > > > > > SET table.sql-dialect=hive;CREATE TABLE hive_table ( > > > user_id STRING, > > > order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS > > > parquet TBLPROPERTIES ( > > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > > 'sink.partition-commit.trigger'='partition-time', > > > 'sink.partition-commit.delay'='1 h', > > > 'sink.partition-commit.policy.kind'='metastore,success-file'); > > > SET table.sql-dialect=default;CREATE TABLE kafka_table ( > > > user_id STRING, > > > order_amount DOUBLE, > > > log_ts TIMESTAMP(3), > > > WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define > > > watermark on TIMESTAMP column) WITH (...); > > > > > > > > > > > > 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 > > > > > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 > > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 > > > > > > > > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。 > > > > > >
Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题
嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在 flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。 > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file'); Caizhi Weng 于2021年11月2日周二 上午10:47写道: > Hi! > > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取 > hive 里表的结构等信息。但 kafka 的表定义仍然要写。 > > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。 > > yidan zhao 于2021年11月1日周一 下午3:05写道: > > > 如题,我看了官方文档,定义好kafka和hive表。 > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 > > > > SET table.sql-dialect=hive;CREATE TABLE hive_table ( > > user_id STRING, > > order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS > > parquet TBLPROPERTIES ( > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > 'sink.partition-commit.trigger'='partition-time', > > 'sink.partition-commit.delay'='1 h', > > 'sink.partition-commit.policy.kind'='metastore,success-file'); > > SET table.sql-dialect=default;CREATE TABLE kafka_table ( > > user_id STRING, > > order_amount DOUBLE, > > log_ts TIMESTAMP(3), > > WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define > > watermark on TIMESTAMP column) WITH (...); > > > > > > > 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 > > > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 > > > > > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。 > > >
Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题
Hi! hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取 hive 里表的结构等信息。但 kafka 的表定义仍然要写。 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。 yidan zhao 于2021年11月1日周一 下午3:05写道: > 如题,我看了官方文档,定义好kafka和hive表。 > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 > > SET table.sql-dialect=hive;CREATE TABLE hive_table ( > user_id STRING, > order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS > parquet TBLPROPERTIES ( > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file'); > SET table.sql-dialect=default;CREATE TABLE kafka_table ( > user_id STRING, > order_amount DOUBLE, > log_ts TIMESTAMP(3), > WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define > watermark on TIMESTAMP column) WITH (...); > > > 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 > > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。 >
关于FlinkSQL从kafka读取数据写到hive的一些问题
如题,我看了官方文档,定义好kafka和hive表。 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 SET table.sql-dialect=hive;CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file'); SET table.sql-dialect=default;CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column) WITH (...); 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。