[jira] [Commented] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate
[ https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17533221#comment-17533221 ] jinfeng commented on FLINK-27519: - [~paul8263] Yes, there are already duplicate column names before the LOGICAL_CONVERTERS rule is applied, I think the easiest way is to modify FlinkLogicalOverAggregate if it cannot be fixed in calcite currently. > Fix duplicates names when there are multiple levels of over window aggregate > > > Key: FLINK-27519 > URL: https://issues.apache.org/jira/browse/FLINK-27519 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.15.0 >Reporter: jinfeng >Priority: Major > > A similar issue like > [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121] > And can be reproduced by adding this unit test > org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate > {code:java} > //代码占位符 > @Test > def testWindowAggregateWithAnotherWindowAggregate(): Unit = { > val sql = > """ > |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM ( > | SELECT *, count(distinct(c)) over (partition by a order by b desc) > AS uv > | FROM ( > |SELECT *, count(*) over (partition by a, c order by b desc) AS pv > |FROM MyTable > | ) > |) > |""".stripMargin > util.verifyExecPlan(sql) > } {code} > The error message : > > > {code:java} > //代码占位符 > org.apache.flink.table.api.ValidationException: Field names must be unique. > Found duplicates: [w0$o0] at > org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273) > at org.apache.flink.table.types.logical.RowType.(RowType.java:158) > at org.apache.flink.table.types.logical.RowType.of(RowType.java:298) > at org.apache.flink.table.types.logical.RowType.of(RowType.java:290) > at > org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71) > {code} > > I think we can add come logical in FlinkLogicalOverAggregate to avoid > duplicate names of output rowType. > > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate
[ https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532615#comment-17532615 ] jinfeng commented on FLINK-27519: - [~jark] [~TsReaper] Can you help to confirm this issue. If this is a problem , I can help to fix this. > Fix duplicates names when there are multiple levels of over window aggregate > > > Key: FLINK-27519 > URL: https://issues.apache.org/jira/browse/FLINK-27519 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.15.0 >Reporter: jinfeng >Priority: Major > > A similar issue like > [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121] > And can be reproduced by adding this unit test > org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate > {code:java} > //代码占位符 > @Test > def testWindowAggregateWithAnotherWindowAggregate(): Unit = { > val sql = > """ > |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM ( > | SELECT *, count(distinct(c)) over (partition by a order by b desc) > AS uv > | FROM ( > |SELECT *, count(*) over (partition by a, c order by b desc) AS pv > |FROM MyTable > | ) > |) > |""".stripMargin > util.verifyExecPlan(sql) > } {code} > The error message : > > > {code:java} > //代码占位符 > org.apache.flink.table.api.ValidationException: Field names must be unique. > Found duplicates: [w0$o0] at > org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273) > at org.apache.flink.table.types.logical.RowType.(RowType.java:158) > at org.apache.flink.table.types.logical.RowType.of(RowType.java:298) > at org.apache.flink.table.types.logical.RowType.of(RowType.java:290) > at > org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71) > {code} > > I think we can add come logical in FlinkLogicalOverAggregate to avoid > duplicate names of output rowType. > > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate
jinfeng created FLINK-27519: --- Summary: Fix duplicates names when there are multiple levels of over window aggregate Key: FLINK-27519 URL: https://issues.apache.org/jira/browse/FLINK-27519 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.15.0 Reporter: jinfeng A similar issue like [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121] And can be reproduced by adding this unit test org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate {code:java} //代码占位符 @Test def testWindowAggregateWithAnotherWindowAggregate(): Unit = { val sql = """ |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM ( | SELECT *, count(distinct(c)) over (partition by a order by b desc) AS uv | FROM ( |SELECT *, count(*) over (partition by a, c order by b desc) AS pv |FROM MyTable | ) |) |""".stripMargin util.verifyExecPlan(sql) } {code} The error message : {code:java} //代码占位符 org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [w0$o0] at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273) at org.apache.flink.table.types.logical.RowType.(RowType.java:158) at org.apache.flink.table.types.logical.RowType.of(RowType.java:298) at org.apache.flink.table.types.logical.RowType.of(RowType.java:290) at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71) {code} I think we can add come logical in FlinkLogicalOverAggregate to avoid duplicate names of output rowType. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-20951) IllegalArgumentException when reading Hive parquet table if condition not contain all partitioned fields
[ https://issues.apache.org/jira/browse/FLINK-20951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517778#comment-17517778 ] jinfeng commented on FLINK-20951: - [~jark] [~lirui] I encountered the same problem in version 1.12.2. When I applied FLINK-22202 , this problem did not appear again , I think FLINK-22202 can fix this problem > IllegalArgumentException when reading Hive parquet table if condition not > contain all partitioned fields > > > Key: FLINK-20951 > URL: https://issues.apache.org/jira/browse/FLINK-20951 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.0 > Environment: flink 1.12.0release-12 > sql-cli >Reporter: YUJIANBO >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > auto-deprioritized-minor, auto-unassigned > > The production hive table is partitioned by two fields:datekey and event > I have do this test by Flink-sql-cli:(Spark Sql All is OK) > (1)First: > SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid > = 'aa';(OK) > SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa'; > (Error) > (2)Second: > SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid > = 'bb';(OK) > SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb'; > (Error) > The exception is: > {code} > java.lang.RuntimeException: One or more fetchers have encountered exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: java.lang.IllegalArgumentException > at java.nio.Buffer.position(Buffer.java:244) > at > org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199) > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359) > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328) > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) > at >
[jira] [Commented] (FLINK-26854) 重定向数据库无法写入
[ https://issues.apache.org/jira/browse/FLINK-26854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17512158#comment-17512158 ] jinfeng commented on FLINK-26854: - [~MingHao Liu] I don't think this is a problem with Flink, when writing doris using StreamLoad, you should use httpclient version 4.5.3 [https://github.com/apache/incubator-doris/blob/master/samples/stream_load/java/DorisStreamLoad.java] {code:java} org.apache.httpcomponents httpclient 4.5.3 {code} You can directly depend on this version in your code, and do shade and relocation, Or you can just use doris connector https://github.com/apache/incubator-doris-flink-connector Also, please describe your problem in English. > 重定向数据库无法写入 > -- > > Key: FLINK-26854 > URL: https://issues.apache.org/jira/browse/FLINK-26854 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.14.4 > Environment: jdk1.8 > flink1.14 > flink-scala_2.11 > flink-streaming-scala_2.11 > >Reporter: liuminghao >Priority: Major > > 很荣幸可以使用flink最新的1.14版本,在便捷的同时也遇到一些问题,当我使用http想将数据保存进数据库(doirs),我的数据库有一次重定向,而在flink1.14中并不支持,重定向生成的请求头会被认为是一个错误从而终止任务,(flink1.14不支持自定义请求头,我现在只能暂时在重定向后删除他)代码: > PushContent2Doris.pushContent( , headers = null) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26712) Metadata keys should not conflict with physical columns
[ https://issues.apache.org/jira/browse/FLINK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17511301#comment-17511301 ] jinfeng commented on FLINK-26712: - [~twalthr] Thanks for your reply, maybe I'm thinking a bit complicated, looking forward to this feature. :D > Metadata keys should not conflict with physical columns > --- > > Key: FLINK-26712 > URL: https://issues.apache.org/jira/browse/FLINK-26712 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > If you have an field called timestamp and in addition want to read the > timestamp from the metadata: > {code} > CREATE TABLE animal_sightings_with_metadata ( > `timestamp` TIMESTAMP(3), > `name` STRING, > `country` STRING, > `number` INT, > `append_time` TIMESTAMP(3) METADATA FROM 'timestamp', > `partition` BIGINT METADATA VIRTUAL, > `offset` BIGINT METADATA VIRTUAL, > `headers` MAP METADATA, > `timestamp-type` STRING METADATA, > `leader-epoch` INT METADATA, > `topic` STRING METADATA > ) > {code} > This gives: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Field names must be unique. > Found duplicates: [timestamp] > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26810) The local time zone does not take effect when the dynamic index uses a field of type timestamp_ltz
[ https://issues.apache.org/jira/browse/FLINK-26810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17511288#comment-17511288 ] jinfeng commented on FLINK-26810: - [~martijnvisser] I think this is a problem with the elasticSearch connector, because the conversion of timestampData to String is implemented inside the connector, not at the Table Runtime layer. as shown in the code below {code:java} //代码占位符 case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return (value, dateTimeFormatter) -> { TimestampData indexField = (TimestampData) value; return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter); }; {code} When formatting TimestampData as String, session local timezone is not used > The local time zone does not take effect when the dynamic index uses a field > of type timestamp_ltz > -- > > Key: FLINK-26810 > URL: https://issues.apache.org/jira/browse/FLINK-26810 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Table SQL / Planner, Table > SQL / Runtime >Reporter: jinfeng >Priority: Major > Attachments: 截屏2022-03-23 上午12.48.02.png > > > When using TIMESTAMP_WITH_LOCAL_TIMEZONE field to generate a dynamic index, > it will alway use UTC timezone. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26810) The local time zone does not take effect when the dynamic index uses a field of type timestamp_ltz
[ https://issues.apache.org/jira/browse/FLINK-26810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510764#comment-17510764 ] jinfeng commented on FLINK-26810: - [~fpaul] Can you help to confirm this issue? > The local time zone does not take effect when the dynamic index uses a field > of type timestamp_ltz > -- > > Key: FLINK-26810 > URL: https://issues.apache.org/jira/browse/FLINK-26810 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Reporter: jinfeng >Priority: Major > Attachments: 截屏2022-03-23 上午12.48.02.png > > > When using TIMESTAMP_WITH_LOCAL_TIMEZONE field to generate a dynamic index, > it will alway use UTC timezone. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26810) The local time zone does not take effect when the dynamic index uses a field of type timestamp_ltz
jinfeng created FLINK-26810: --- Summary: The local time zone does not take effect when the dynamic index uses a field of type timestamp_ltz Key: FLINK-26810 URL: https://issues.apache.org/jira/browse/FLINK-26810 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Reporter: jinfeng Attachments: 截屏2022-03-23 上午12.48.02.png When using TIMESTAMP_WITH_LOCAL_TIMEZONE field to generate a dynamic index, it will alway use UTC timezone. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26712) Metadata keys should not conflict with physical columns
[ https://issues.apache.org/jira/browse/FLINK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509787#comment-17509787 ] jinfeng commented on FLINK-26712: - [~twalthr] Currently, when there is a metacolumn, it will scan which metacolumns are used, and pass these fields to the source, and then the source will generate the corresponding columns according to the metakey, and then generate a project node to map the metacolumn to the actual field. This means that even if there are multiple fields referencing the same metakey, the source only needs produce one metacolumn. If we need to modify it, it means that we need to generate multiple metacolumns on the source side, and then refer to the fields of the project node. Will this affect the compatibility of SQL plan generation? > Metadata keys should not conflict with physical columns > --- > > Key: FLINK-26712 > URL: https://issues.apache.org/jira/browse/FLINK-26712 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Priority: Major > > If you have an field called timestamp and in addition want to read the > timestamp from the metadata: > {code} > CREATE TABLE animal_sightings_with_metadata ( > `timestamp` TIMESTAMP(3), > `name` STRING, > `country` STRING, > `number` INT, > `append_time` TIMESTAMP(3) METADATA FROM 'timestamp', > `partition` BIGINT METADATA VIRTUAL, > `offset` BIGINT METADATA VIRTUAL, > `headers` MAP METADATA, > `timestamp-type` STRING METADATA, > `leader-epoch` INT METADATA, > `topic` STRING METADATA > ) > {code} > This gives: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Field names must be unique. > Found duplicates: [timestamp] > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26712) Metadata keys should not conflict with physical columns
[ https://issues.apache.org/jira/browse/FLINK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508213#comment-17508213 ] jinfeng commented on FLINK-26712: - [~twalthr]. I am interested in this, please assign this ticket to me . > Metadata keys should not conflict with physical columns > --- > > Key: FLINK-26712 > URL: https://issues.apache.org/jira/browse/FLINK-26712 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Priority: Major > > If you have an field called timestamp and in addition want to read the > timestamp from the metadata: > {code} > CREATE TABLE animal_sightings_with_metadata ( > `timestamp` TIMESTAMP(3), > `name` STRING, > `country` STRING, > `number` INT, > `append_time` TIMESTAMP(3) METADATA FROM 'timestamp', > `partition` BIGINT METADATA VIRTUAL, > `offset` BIGINT METADATA VIRTUAL, > `headers` MAP METADATA, > `timestamp-type` STRING METADATA, > `leader-epoch` INT METADATA, > `topic` STRING METADATA > ) > {code} > This gives: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Field names must be unique. > Found duplicates: [timestamp] > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
[ https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492949#comment-17492949 ] jinfeng commented on FLINK-26016: - [~luoyuxia] I have submitted a pr, please take the time to help review the code, Thanks . > FileSystemLookupFunction does not produce correct results when hive table > uses columnar storage > --- > > Key: FLINK-26016 > URL: https://issues.apache.org/jira/browse/FLINK-26016 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.14.3 >Reporter: jinfeng >Priority: Major > Labels: pull-request-available > > When I use the parquet hive table as the lookup table, there will be some > records that cannot be joined. This can be reproduced by adding unit tests to > HiveLookupJoinITCase. > {code:java} > // create the hive table with columnar storage. > tableEnv.executeSql( > String.format( > "create table columnar_table (x string) STORED AS > PARQUET " > + "tblproperties ('%s'='5min')", > HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); > @Test > public void testLookupJoinTableWithColumnarStorage() throws Exception { > // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch > is 2048, we should > // write as least 2048 records to the test table. > List testData = new ArrayList<>(4096); > for (int i = 0; i < 4096; i++) { > testData.add(Row.of(String.valueOf(i))); > } > // constructs test data using values table > TableEnvironment batchEnv = > HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); > batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); > batchEnv.useCatalog(hiveCatalog.getName()); > String dataId = TestValuesTableFactory.registerData(testData); > batchEnv.executeSql( > String.format( > "create table value_source(x string, p as proctime()) > with (" > + "'connector' = 'values', 'data-id' = '%s', > 'bounded'='true')", > dataId)); > batchEnv.executeSql("insert overwrite columnar_table select x from > value_source").await(); > TableImpl flinkTable = > (TableImpl) > tableEnv.sqlQuery( > "select t.x as x1, c.x as x2 from > value_source t " > + "left join columnar_table for > system_time as of t.p c " > + "on t.x = c.x where c.x is null"); > List results = > CollectionUtil.iteratorToList(flinkTable.execute().collect()); > assertTrue(results.size() == 0); > } > {code} > The problem may be caused by the following code. > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { >count++; >RowData key = extractLookupKey(row); >List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); >rows.add(serializer.copy(row)); > } > {code} > > It can be fixed with the following modification > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { > count++; > RowData rowData = serializer.copy(row); > RowData key = extractLookupKey(rowData); > List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); > rows.add(rowData); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
[ https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492374#comment-17492374 ] jinfeng commented on FLINK-26016: - [~luoyuxia] , Thanks for your reply. I think the root cause is the ColumnarRowData::getString() method. {code:java} //代码占位符 @Override public StringData getString(int pos) { Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos); return StringData.{_}fromBytes{_}(byteArray.data, byteArray.offset, byteArray.len); } {code} When reading StringData, the data is not copied. And vectorizedColumnBatch is reused when reading different batch. In the lookup join function, the bytes corresponding to the key read first in the cache will be overwritten by the bytes corresponding to the key read later. > FileSystemLookupFunction does not produce correct results when hive table > uses columnar storage > --- > > Key: FLINK-26016 > URL: https://issues.apache.org/jira/browse/FLINK-26016 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.14.3 >Reporter: jinfeng >Priority: Major > > When I use the parquet hive table as the lookup table, there will be some > records that cannot be joined. This can be reproduced by adding unit tests to > HiveLookupJoinITCase. > {code:java} > // create the hive table with columnar storage. > tableEnv.executeSql( > String.format( > "create table columnar_table (x string) STORED AS > PARQUET " > + "tblproperties ('%s'='5min')", > HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); > @Test > public void testLookupJoinTableWithColumnarStorage() throws Exception { > // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch > is 2048, we should > // write as least 2048 records to the test table. > List testData = new ArrayList<>(4096); > for (int i = 0; i < 4096; i++) { > testData.add(Row.of(String.valueOf(i))); > } > // constructs test data using values table > TableEnvironment batchEnv = > HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); > batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); > batchEnv.useCatalog(hiveCatalog.getName()); > String dataId = TestValuesTableFactory.registerData(testData); > batchEnv.executeSql( > String.format( > "create table value_source(x string, p as proctime()) > with (" > + "'connector' = 'values', 'data-id' = '%s', > 'bounded'='true')", > dataId)); > batchEnv.executeSql("insert overwrite columnar_table select x from > value_source").await(); > TableImpl flinkTable = > (TableImpl) > tableEnv.sqlQuery( > "select t.x as x1, c.x as x2 from > value_source t " > + "left join columnar_table for > system_time as of t.p c " > + "on t.x = c.x where c.x is null"); > List results = > CollectionUtil.iteratorToList(flinkTable.execute().collect()); > assertTrue(results.size() == 0); > } > {code} > The problem may be caused by the following code. > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { >count++; >RowData key = extractLookupKey(row); >List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); >rows.add(serializer.copy(row)); > } > {code} > > It can be fixed with the following modification > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { > count++; > RowData rowData = serializer.copy(row); > RowData key = extractLookupKey(rowData); > List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); > rows.add(rowData); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
[ https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492326#comment-17492326 ] jinfeng edited comment on FLINK-26016 at 2/15/22, 2:41 AM: --- [~MartijnVisser] [~luoyuxia] Can you help take a look? If it's a bug, I can help fix it was (Author: hackergin): [~MartijnVisser] Can you help take a look? > FileSystemLookupFunction does not produce correct results when hive table > uses columnar storage > --- > > Key: FLINK-26016 > URL: https://issues.apache.org/jira/browse/FLINK-26016 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.14.3 >Reporter: jinfeng >Priority: Major > > When I use the parquet hive table as the lookup table, there will be some > records that cannot be joined. This can be reproduced by adding unit tests to > HiveLookupJoinITCase. > {code:java} > // create the hive table with columnar storage. > tableEnv.executeSql( > String.format( > "create table columnar_table (x string) STORED AS > PARQUET " > + "tblproperties ('%s'='5min')", > HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); > @Test > public void testLookupJoinTableWithColumnarStorage() throws Exception { > // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch > is 2048, we should > // write as least 2048 records to the test table. > List testData = new ArrayList<>(4096); > for (int i = 0; i < 4096; i++) { > testData.add(Row.of(String.valueOf(i))); > } > // constructs test data using values table > TableEnvironment batchEnv = > HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); > batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); > batchEnv.useCatalog(hiveCatalog.getName()); > String dataId = TestValuesTableFactory.registerData(testData); > batchEnv.executeSql( > String.format( > "create table value_source(x string, p as proctime()) > with (" > + "'connector' = 'values', 'data-id' = '%s', > 'bounded'='true')", > dataId)); > batchEnv.executeSql("insert overwrite columnar_table select x from > value_source").await(); > TableImpl flinkTable = > (TableImpl) > tableEnv.sqlQuery( > "select t.x as x1, c.x as x2 from > value_source t " > + "left join columnar_table for > system_time as of t.p c " > + "on t.x = c.x where c.x is null"); > List results = > CollectionUtil.iteratorToList(flinkTable.execute().collect()); > assertTrue(results.size() == 0); > } > {code} > The problem may be caused by the following code. > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { >count++; >RowData key = extractLookupKey(row); >List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); >rows.add(serializer.copy(row)); > } > {code} > > It can be fixed with the following modification > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { > count++; > RowData rowData = serializer.copy(row); > RowData key = extractLookupKey(rowData); > List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); > rows.add(rowData); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
[ https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492326#comment-17492326 ] jinfeng commented on FLINK-26016: - [~MartijnVisser] Can you help take a look? > FileSystemLookupFunction does not produce correct results when hive table > uses columnar storage > --- > > Key: FLINK-26016 > URL: https://issues.apache.org/jira/browse/FLINK-26016 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.14.3 >Reporter: jinfeng >Priority: Major > > When I use the parquet hive table as the lookup table, there will be some > records that cannot be joined. This can be reproduced by adding unit tests to > HiveLookupJoinITCase. > {code:java} > // create the hive table with columnar storage. > tableEnv.executeSql( > String.format( > "create table columnar_table (x string) STORED AS > PARQUET " > + "tblproperties ('%s'='5min')", > HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); > @Test > public void testLookupJoinTableWithColumnarStorage() throws Exception { > // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch > is 2048, we should > // write as least 2048 records to the test table. > List testData = new ArrayList<>(4096); > for (int i = 0; i < 4096; i++) { > testData.add(Row.of(String.valueOf(i))); > } > // constructs test data using values table > TableEnvironment batchEnv = > HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); > batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); > batchEnv.useCatalog(hiveCatalog.getName()); > String dataId = TestValuesTableFactory.registerData(testData); > batchEnv.executeSql( > String.format( > "create table value_source(x string, p as proctime()) > with (" > + "'connector' = 'values', 'data-id' = '%s', > 'bounded'='true')", > dataId)); > batchEnv.executeSql("insert overwrite columnar_table select x from > value_source").await(); > TableImpl flinkTable = > (TableImpl) > tableEnv.sqlQuery( > "select t.x as x1, c.x as x2 from > value_source t " > + "left join columnar_table for > system_time as of t.p c " > + "on t.x = c.x where c.x is null"); > List results = > CollectionUtil.iteratorToList(flinkTable.execute().collect()); > assertTrue(results.size() == 0); > } > {code} > The problem may be caused by the following code. > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { >count++; >RowData key = extractLookupKey(row); >List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); >rows.add(serializer.copy(row)); > } > {code} > > It can be fixed with the following modification > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { > count++; > RowData rowData = serializer.copy(row); > RowData key = extractLookupKey(rowData); > List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); > rows.add(rowData); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
[ https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17488974#comment-17488974 ] jinfeng commented on FLINK-26016: - cc [~lirui] > FileSystemLookupFunction does not produce correct results when hive table > uses columnar storage > --- > > Key: FLINK-26016 > URL: https://issues.apache.org/jira/browse/FLINK-26016 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.14.3 >Reporter: jinfeng >Priority: Major > > When I use the parquet hive table as the lookup table, there will be some > records that cannot be joined. This can be reproduced by adding unit tests to > HiveLookupJoinITCase. > {code:java} > // create the hive table with columnar storage. > tableEnv.executeSql( > String.format( > "create table columnar_table (x string) STORED AS > PARQUET " > + "tblproperties ('%s'='5min')", > HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); > @Test > public void testLookupJoinTableWithColumnarStorage() throws Exception { > // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch > is 2048, we should > // write as least 2048 records to the test table. > List testData = new ArrayList<>(4096); > for (int i = 0; i < 4096; i++) { > testData.add(Row.of(String.valueOf(i))); > } > // constructs test data using values table > TableEnvironment batchEnv = > HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); > batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); > batchEnv.useCatalog(hiveCatalog.getName()); > String dataId = TestValuesTableFactory.registerData(testData); > batchEnv.executeSql( > String.format( > "create table value_source(x string, p as proctime()) > with (" > + "'connector' = 'values', 'data-id' = '%s', > 'bounded'='true')", > dataId)); > batchEnv.executeSql("insert overwrite columnar_table select x from > value_source").await(); > TableImpl flinkTable = > (TableImpl) > tableEnv.sqlQuery( > "select t.x as x1, c.x as x2 from > value_source t " > + "left join columnar_table for > system_time as of t.p c " > + "on t.x = c.x where c.x is null"); > List results = > CollectionUtil.iteratorToList(flinkTable.execute().collect()); > assertTrue(results.size() == 0); > } > {code} > The problem may be caused by the following code. > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { >count++; >RowData key = extractLookupKey(row); >List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); >rows.add(serializer.copy(row)); > } > {code} > > It can be fixed with the following modification > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { > count++; > RowData rowData = serializer.copy(row); > RowData key = extractLookupKey(rowData); > List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); > rows.add(rowData); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
jinfeng created FLINK-26016: --- Summary: FileSystemLookupFunction does not produce correct results when hive table uses columnar storage Key: FLINK-26016 URL: https://issues.apache.org/jira/browse/FLINK-26016 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.14.3 Reporter: jinfeng When I use the parquet hive table as the lookup table, there will be some records that cannot be joined. This can be reproduced by adding unit tests to HiveLookupJoinITCase. {code:java} // create the hive table with columnar storage. tableEnv.executeSql( String.format( "create table columnar_table (x string) STORED AS PARQUET " + "tblproperties ('%s'='5min')", HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); @Test public void testLookupJoinTableWithColumnarStorage() throws Exception { // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch is 2048, we should // write as least 2048 records to the test table. List testData = new ArrayList<>(4096); for (int i = 0; i < 4096; i++) { testData.add(Row.of(String.valueOf(i))); } // constructs test data using values table TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); batchEnv.useCatalog(hiveCatalog.getName()); String dataId = TestValuesTableFactory.registerData(testData); batchEnv.executeSql( String.format( "create table value_source(x string, p as proctime()) with (" + "'connector' = 'values', 'data-id' = '%s', 'bounded'='true')", dataId)); batchEnv.executeSql("insert overwrite columnar_table select x from value_source").await(); TableImpl flinkTable = (TableImpl) tableEnv.sqlQuery( "select t.x as x1, c.x as x2 from value_source t " + "left join columnar_table for system_time as of t.p c " + "on t.x = c.x where c.x is null"); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); assertTrue(results.size() == 0); } {code} The problem may be caused by the following code. {code:java} RowData row; while ((row = partitionReader.read(reuse)) != null) { count++; RowData key = extractLookupKey(row); List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); rows.add(serializer.copy(row)); } {code} It can be fixed with the following modification {code:java} RowData row; while ((row = partitionReader.read(reuse)) != null) { count++; RowData rowData = serializer.copy(row); RowData key = extractLookupKey(rowData); List rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); rows.add(rowData); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-24571) elasticsearch dynamic index support the use of proctime
[ https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17487828#comment-17487828 ] jinfeng edited comment on FLINK-24571 at 2/7/22, 2:17 AM: -- I'll update the pr in this week. To make sure proctime dynamc index only works on append only stream. was (Author: hackergin): I'll update the pr in this week. To make sure dynamic of proctime only works on append only stream. > elasticsearch dynamic index support the use of proctime > > > Key: FLINK-24571 > URL: https://issues.apache.org/jira/browse/FLINK-24571 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > Labels: pull-request-available > > When use eleasticsearch dynamic index, we must special a field. If there is > no time-attribute field, we can't use dynamic index. so support dynamic > index using proctime may be a good choice. > Before implement this feature, we should discuss the proctime index pattern. > > The current dynamic index is : > {code:sql} > Create table esSink ( > f1 timestamp(3) > ) with ( > 'index' = 'myusers_{f1|-MM-dd}' > ); > {code} > When using proctime as dynamic index, It would be like this. > {code:sql} > Create table esSink ( >f1 varchar > ) with ( > 'index' = 'myusers_{|-MM-dd}' > ); > {code} > When field name is not specified, we use proctime to generate the dynamic > index. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24571) elasticsearch dynamic index support the use of proctime
[ https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17487828#comment-17487828 ] jinfeng commented on FLINK-24571: - I'll update the pr in this week. To make sure dynamic of proctime only works on append only stream. > elasticsearch dynamic index support the use of proctime > > > Key: FLINK-24571 > URL: https://issues.apache.org/jira/browse/FLINK-24571 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > Labels: pull-request-available > > When use eleasticsearch dynamic index, we must special a field. If there is > no time-attribute field, we can't use dynamic index. so support dynamic > index using proctime may be a good choice. > Before implement this feature, we should discuss the proctime index pattern. > > The current dynamic index is : > {code:sql} > Create table esSink ( > f1 timestamp(3) > ) with ( > 'index' = 'myusers_{f1|-MM-dd}' > ); > {code} > When using proctime as dynamic index, It would be like this. > {code:sql} > Create table esSink ( >f1 varchar > ) with ( > 'index' = 'myusers_{|-MM-dd}' > ); > {code} > When field name is not specified, we use proctime to generate the dynamic > index. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24571) elasticsearch dynamic index support the use of proctime
[ https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jinfeng updated FLINK-24571: Labels: pull-request-available (was: pull-request-available stale-assigned) > elasticsearch dynamic index support the use of proctime > > > Key: FLINK-24571 > URL: https://issues.apache.org/jira/browse/FLINK-24571 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > Labels: pull-request-available > > When use eleasticsearch dynamic index, we must special a field. If there is > no time-attribute field, we can't use dynamic index. so support dynamic > index using proctime may be a good choice. > Before implement this feature, we should discuss the proctime index pattern. > > The current dynamic index is : > {code:sql} > Create table esSink ( > f1 timestamp(3) > ) with ( > 'index' = 'myusers_{f1|-MM-dd}' > ); > {code} > When using proctime as dynamic index, It would be like this. > {code:sql} > Create table esSink ( >f1 varchar > ) with ( > 'index' = 'myusers_{|-MM-dd}' > ); > {code} > When field name is not specified, we use proctime to generate the dynamic > index. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-23545) Use new schema in SqlCreateTableConverter
[ https://issues.apache.org/jira/browse/FLINK-23545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17476927#comment-17476927 ] jinfeng commented on FLINK-23545: - [~airblader] [~twalthr] This feature seems to be supported in 1.15 Is there any tickets tracking removing the legacy connectors ? If not , I can help to create new one. > Use new schema in SqlCreateTableConverter > - > > Key: FLINK-23545 > URL: https://issues.apache.org/jira/browse/FLINK-23545 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.15.0 > > > In order to support column comment in sql create table dll. We should use > new schema in SqlCreateeTableConverter. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25419) Support the metadata column to generate dynamic index
[ https://issues.apache.org/jira/browse/FLINK-25419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463762#comment-17463762 ] jinfeng commented on FLINK-25419: - cc [~fpaul] [~matriv] > Support the metadata column to generate dynamic index > -- > > Key: FLINK-25419 > URL: https://issues.apache.org/jira/browse/FLINK-25419 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: jinfeng >Priority: Major > Fix For: 1.15.0 > > > As mentioned in [https://github.com/apache/flink/pull/18058] We can > implement metadata column to increase the flexibility of using dynamic > indexes . > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25419) Support the metadata column to generate dynamic index
[ https://issues.apache.org/jira/browse/FLINK-25419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jinfeng updated FLINK-25419: Description: As mentioned in [https://github.com/apache/flink/pull/18058] We can implement metadata column to increase the flexibility of using dynamic indexes . was: As mentioned in [https://github.com/apache/flink/pull/18058 |https://github.com/apache/flink/pull/18058,] We can implement metadata column to increase the flexibility of using dynamic indexes . > Support the metadata column to generate dynamic index > -- > > Key: FLINK-25419 > URL: https://issues.apache.org/jira/browse/FLINK-25419 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: jinfeng >Priority: Major > Fix For: 1.15.0 > > > As mentioned in [https://github.com/apache/flink/pull/18058] We can > implement metadata column to increase the flexibility of using dynamic > indexes . > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25419) Support the metadata column to generate dynamic index
[ https://issues.apache.org/jira/browse/FLINK-25419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jinfeng updated FLINK-25419: Description: As mentioned in [https://github.com/apache/flink/pull/18058 |https://github.com/apache/flink/pull/18058,] We can implement metadata column to increase the flexibility of using dynamic indexes . was: As mentioned in [https://github.com/apache/flink/pull/18058,] We can implement metadata column to increase the flexibility of using dynamic indexes . > Support the metadata column to generate dynamic index > -- > > Key: FLINK-25419 > URL: https://issues.apache.org/jira/browse/FLINK-25419 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: jinfeng >Priority: Major > Fix For: 1.15.0 > > > As mentioned in [https://github.com/apache/flink/pull/18058 > |https://github.com/apache/flink/pull/18058,] We can implement metadata > column to increase the flexibility of using dynamic indexes . > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25419) Support the metadata column to generate dynamic index
jinfeng created FLINK-25419: --- Summary: Support the metadata column to generate dynamic index Key: FLINK-25419 URL: https://issues.apache.org/jira/browse/FLINK-25419 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: jinfeng Fix For: 1.15.0 As mentioned in [https://github.com/apache/flink/pull/18058,] We can implement metadata column to increase the flexibility of using dynamic indexes . -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-21068) Add new timeout options for Elasticsearch connector
[ https://issues.apache.org/jira/browse/FLINK-21068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458356#comment-17458356 ] jinfeng commented on FLINK-21068: - [~fpaul] I submitted the pull request [https://github.com/apache/flink/pull/18097] > Add new timeout options for Elasticsearch connector > --- > > Key: FLINK-21068 > URL: https://issues.apache.org/jira/browse/FLINK-21068 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Affects Versions: 1.12.1 >Reporter: jinfeng >Assignee: jinfeng >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > Currently, the connection.max-retry-timeout seems not work with new > elasticsearch connector. The elasticsearch community has Remove > setMaxRetryTimeoutMillis from RestClientBuilder. We can set timeout options > when create RestHighLevelClient in > Elasticsearch7ApiCallBridge , like > {code:java} > //代码占位符 > @Override > public RestHighLevelClient createClient(Map clientConfig) > throws IOException { >RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new > HttpHost[httpHosts.size()])); >builder.setRequestConfigCallback(new > RestClientBuilder.RequestConfigCallback() { > @Override > public RequestConfig.Builder > customizeRequestConfig(RequestConfig.Builder builder) { > if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_TIMEOUT)) { > > builder.setConnectTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_TIMEOUT))); > } > if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)) { > > builder.setSocketTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT))); > } > if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)) > { > > builder.setConnectionRequestTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT))); > } > return builder; > } >}); > {code} > > So, we can add three table config to config eleasticsearch timeout. > connection.timeout > connection.socket-timeout > connection.request-timeout > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-21068) Add new timeout options for Elasticsearch connector
[ https://issues.apache.org/jira/browse/FLINK-21068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17432453#comment-17432453 ] jinfeng commented on FLINK-21068: - [~karmagyz] Sure I can take this issue. > Add new timeout options for Elasticsearch connector > --- > > Key: FLINK-21068 > URL: https://issues.apache.org/jira/browse/FLINK-21068 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Affects Versions: 1.12.1 >Reporter: jinfeng >Priority: Minor > Labels: auto-deprioritized-major > > Currently, the connection.max-retry-timeout seems not work with new > elasticsearch connector. The elasticsearch community has Remove > setMaxRetryTimeoutMillis from RestClientBuilder. We can set timeout options > when create RestHighLevelClient in > Elasticsearch7ApiCallBridge , like > {code:java} > //代码占位符 > @Override > public RestHighLevelClient createClient(Map clientConfig) > throws IOException { >RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new > HttpHost[httpHosts.size()])); >builder.setRequestConfigCallback(new > RestClientBuilder.RequestConfigCallback() { > @Override > public RequestConfig.Builder > customizeRequestConfig(RequestConfig.Builder builder) { > if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_TIMEOUT)) { > > builder.setConnectTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_TIMEOUT))); > } > if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)) { > > builder.setSocketTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT))); > } > if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)) > { > > builder.setConnectionRequestTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT))); > } > return builder; > } >}); > {code} > > So, we can add three table config to config eleasticsearch timeout. > connection.timeout > connection.socket-timeout > connection.request-timeout > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified
[ https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17431115#comment-17431115 ] jinfeng commented on FLINK-23169: - [~xtsong] Thank you for your patience, I totally agree with you. There may indeed be many other staging directory grouping forms. If more people have this problem in the future, we can consider adding this feature. Currently we can use 'flink run -Dyarn.staging-directory=/flink/staging/user1' to meet ours requirements > Support user-level app staging directory when yarn.staging-directory is > specified > - > > Key: FLINK-23169 > URL: https://issues.apache.org/jira/browse/FLINK-23169 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: jinfeng >Priority: Major > > When yarn.staging-directory is specified, different users will use the same > directory as the staging directory. It may not friendly for a job platform > to submit job for different users. I propose to use the user-level directory > by default when yarn.staging-directory is specified. We only need to make > small changes for `getStagingDir` function in > YarnClusterDescriptor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified
[ https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17431022#comment-17431022 ] jinfeng commented on FLINK-23169: - Hi [~xtsong] 1. Putting them together doesn't seem to be a big problem. But I think it is more convenient to manage the quota size and permissions of the directory separately by user. 2. Yes, we can configure different staging directories when submiting the job, just like configue the checkpoint directory. For a single user, it may not be a big problem. But for a flink job platform, it should consider to manage the config for different users. So I think this may be a general requirement. And here is the spark related jira . https://issues.apache.org/jira/browse/SPARK-26877 > Support user-level app staging directory when yarn.staging-directory is > specified > - > > Key: FLINK-23169 > URL: https://issues.apache.org/jira/browse/FLINK-23169 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: jinfeng >Priority: Major > > When yarn.staging-directory is specified, different users will use the same > directory as the staging directory. It may not friendly for a job platform > to submit job for different users. I propose to use the user-level directory > by default when yarn.staging-directory is specified. We only need to make > small changes for `getStagingDir` function in > YarnClusterDescriptor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24571) elasticsearch dynamic index support the use of proctime
[ https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17429872#comment-17429872 ] jinfeng commented on FLINK-24571: - [~MartijnVisser] Thanks for reminding, I will learn the implementation of the latest es sink connector. > elasticsearch dynamic index support the use of proctime > > > Key: FLINK-24571 > URL: https://issues.apache.org/jira/browse/FLINK-24571 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > > When use eleasticsearch dynamic index, we must special a field. If there is > no time-attribute field, we can't use dynamic index. so support dynamic > index using proctime may be a good choice. > Before implement this feature, we should discuss the proctime index pattern. > > The current dynamic index is : > {code:sql} > Create table esSink ( > f1 timestamp(3) > ) with ( > 'index' = 'myusers_{f1|-MM-dd}' > ); > {code} > When using proctime as dynamic index, It would be like this. > {code:sql} > Create table esSink ( >f1 varchar > ) with ( > 'index' = 'myusers_{|-MM-dd}' > ); > {code} > When field name is not specified, we use proctime to generate the dynamic > index. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24571) elasticsearch dynamic index support the use of proctime
[ https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17429810#comment-17429810 ] jinfeng commented on FLINK-24571: - [~jark] Thank's for the great suggestion, I think using pattern is much more better. I can help to implement this feature. > elasticsearch dynamic index support the use of proctime > > > Key: FLINK-24571 > URL: https://issues.apache.org/jira/browse/FLINK-24571 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: jinfeng >Priority: Major > > When use eleasticsearch dynamic index, we must special a field. If there is > no time-attribute field, we can't use dynamic index. so support dynamic > index using proctime may be a good choice. > Before implement this feature, we should discuss the proctime index pattern. > > The current dynamic index is : > {code:sql} > Create table esSink ( > f1 timestamp(3) > ) with ( > 'index' = 'myusers_{f1|-MM-dd}' > ); > {code} > When using proctime as dynamic index, It would be like this. > {code:sql} > Create table esSink ( >f1 varchar > ) with ( > 'index' = 'myusers_{|-MM-dd}' > ); > {code} > When field name is not specified, we use proctime to generate the dynamic > index. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24571) elasticsearch dynamic index support the use of proctime
jinfeng created FLINK-24571: --- Summary: elasticsearch dynamic index support the use of proctime Key: FLINK-24571 URL: https://issues.apache.org/jira/browse/FLINK-24571 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: jinfeng When use eleasticsearch dynamic index, we must special a field. If there is no time-attribute field, we can't use dynamic index. so support dynamic index using proctime may be a good choice. Before implement this feature, we should discuss the proctime index pattern. The current dynamic index is : {code:sql} Create table esSink ( f1 timestamp(3) ) with ( 'index' = 'myusers_{f1|-MM-dd}' ); {code} When using proctime as dynamic index, It would be like this. {code:sql} Create table esSink ( f1 varchar ) with ( 'index' = 'myusers_{|-MM-dd}' ); {code} When field name is not specified, we use proctime to generate the dynamic index. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24072) Add support for setting default headers in elasticsearch connector
jinfeng created FLINK-24072: --- Summary: Add support for setting default headers in elasticsearch connector Key: FLINK-24072 URL: https://issues.apache.org/jira/browse/FLINK-24072 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: jinfeng If we add support for setting default headers , we can add some head options in sql options. The ddl would be like this. {code:sql} // Some comments here create table es-sink ( a varchar, b varchar ) with ( 'connector' = 'elasticsearch-7', 'connection.default-headers' = 'Authorization:xxx' ); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23545) Use new schema in SqlCreateTableConverter
[ https://issues.apache.org/jira/browse/FLINK-23545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395179#comment-17395179 ] jinfeng commented on FLINK-23545: - [~twalthr] I found [FLINK-23513 |https://issues.apache.org/jira/browse/FLINK-23513] has remove many legacy connector, but there are still many legacy left. When will we completely remove the legacy connector. Maybe I can help with extra jobs. > Use new schema in SqlCreateTableConverter > - > > Key: FLINK-23545 > URL: https://issues.apache.org/jira/browse/FLINK-23545 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > In order to support column comment in sql create table dll. We should use > new schema in SqlCreateeTableConverter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23545) Use new schema in SqlCreateTableConverter
[ https://issues.apache.org/jira/browse/FLINK-23545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393969#comment-17393969 ] jinfeng commented on FLINK-23545: - [~jark] Thanks for your reply, maybe i didn't express clearly. After use DefaultCatalogTable instead of CatalogTableImpl, legacy connector will not work , As cataloTable is DefaultCatalogTable (Unresovled table) when execute {code:java} TableFactoryUtil.findAndCreateTableSource( schemaTable.getCatalog(), schemaTable.getTableIdentifier(), catalogTable, new Configuration(), schemaTable.isTemporary()); {code} It will throw exception `Only a resolved catalog table can be serialized into a map of string properties`; The "COLLECTION" table maybe another problem, TestCollectionTableFactory is now a legacy connector, but it has the 'new style' table options. > Use new schema in SqlCreateTableConverter > - > > Key: FLINK-23545 > URL: https://issues.apache.org/jira/browse/FLINK-23545 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > In order to support column comment in sql create table dll. We should use > new schema in SqlCreateeTableConverter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23545) Use new schema in SqlCreateTableConverter
[ https://issues.apache.org/jira/browse/FLINK-23545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393678#comment-17393678 ] jinfeng commented on FLINK-23545: - When I implement the code, I found a few problems. After constructing the schema and some other attributes, I use `Catalog.of` method to create the CatalogTable which is a DefaultCatalogTable. ``` CatalogTable.of(mergedSchema, tableComment, partitionKeys, mergedOptions); ``` But when parser a ddl like this ``` CREATE TABLE testCollection ( field1 VARCHAR ) with ( 'connector' = 'COLLECTION' ) ``` COLLECTION is a legacy table, when generate the rel node, First, it will execute the `isLegacySourceOptions(CatalogTable catalogTable, CatalogSchemaTable schemaTable)` method , it will throw the exception `"Only a resolved catalog table can be serialized into a map of string properties.` Next it will create CatalogSourceTable to consturct the dynamic table. As COLLECTION is a legacy table, so the job will throw `Unable to create a source for reading table` . Also, I found the new TableDescriptor has the same problems, when use TableDescriptor consturct the legacy table with new style table options. > Use new schema in SqlCreateTableConverter > - > > Key: FLINK-23545 > URL: https://issues.apache.org/jira/browse/FLINK-23545 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > Fix For: 1.14.0 > > > In order to support column comment in sql create table dll. We should use > new schema in SqlCreateeTableConverter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18958) Lose column comment when create table
[ https://issues.apache.org/jira/browse/FLINK-18958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17389879#comment-17389879 ] jinfeng commented on FLINK-18958: - [~jark]. I create https://issues.apache.org/jira/browse/FLINK-23545 to replace tableSchema with new Schema first. > Lose column comment when create table > - > > Key: FLINK-18958 > URL: https://issues.apache.org/jira/browse/FLINK-18958 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.0 >Reporter: Shengkai Fang >Assignee: jinfeng >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.14.0 > > > Currently, table column will not store column comment and user can't see > column comment when use {{describe table}} sql. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23545) Use new schema in SqlCreateTableConverter
jinfeng created FLINK-23545: --- Summary: Use new schema in SqlCreateTableConverter Key: FLINK-23545 URL: https://issues.apache.org/jira/browse/FLINK-23545 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: jinfeng Fix For: 1.14.0 In order to support column comment in sql create table dll. We should use new schema in SqlCreateeTableConverter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18958) Lose column comment when create table
[ https://issues.apache.org/jira/browse/FLINK-18958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17388584#comment-17388584 ] jinfeng commented on FLINK-18958: - [~jark] Sure, please assign this ticket to me. > Lose column comment when create table > - > > Key: FLINK-18958 > URL: https://issues.apache.org/jira/browse/FLINK-18958 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.0 >Reporter: Shengkai Fang >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.14.0 > > > Currently, table column will not store column comment and user can't see > column comment when use {{describe table}} sql. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22936) Support column comment in Schema and ResolvedSchema
[ https://issues.apache.org/jira/browse/FLINK-22936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17386868#comment-17386868 ] jinfeng commented on FLINK-22936: - [~jark] introducing {{Column withComment(comment)}} method on {{Column}} class is much more better. And I submitted the pr , please take time to review the code when you are free. > Support column comment in Schema and ResolvedSchema > --- > > Key: FLINK-22936 > URL: https://issues.apache.org/jira/browse/FLINK-22936 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Jark Wu >Assignee: jinfeng >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > In order to support column comment in catalog (FLINK-18958), we should first > support column comment in Schema and ResolvedSchema. > The API is up to discuss. Currently, we already have 10 methods for adding a > column in {{Schema}}. If we want to support column comment for each kind of > column, the number of column methods may double. It's not easy to maintain in > the long term, and make the API complex. > Another alternative is adding a new method {{comment(String)}} which will > apply comment to the previous column. This is not a good builder style, but > can make the building concise. > For example, > {code} > Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)).comment("log > timestamp") > .columnByExpression("proctime", > "PROCTIME()").comment("processing time") > .watermark("ts", "ts - INTERVAL '5' SECOND") > .build() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22936) Support column comment in Schema and ResolvedSchema
[ https://issues.apache.org/jira/browse/FLINK-22936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385988#comment-17385988 ] jinfeng commented on FLINK-22936: - [~jark] [~twalthr] The final solution is add a withComment method which will apply comment to the previous column ? And we need to add the setComment method to the Column class . I can help implement this . > Support column comment in Schema and ResolvedSchema > --- > > Key: FLINK-22936 > URL: https://issues.apache.org/jira/browse/FLINK-22936 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Jark Wu >Priority: Major > Fix For: 1.14.0 > > > In order to support column comment in catalog (FLINK-18958), we should first > support column comment in Schema and ResolvedSchema. > The API is up to discuss. Currently, we already have 10 methods for adding a > column in {{Schema}}. If we want to support column comment for each kind of > column, the number of column methods may double. It's not easy to maintain in > the long term, and make the API complex. > Another alternative is adding a new method {{comment(String)}} which will > apply comment to the previous column. This is not a good builder style, but > can make the building concise. > For example, > {code} > Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)).comment("log > timestamp") > .columnByExpression("proctime", > "PROCTIME()").comment("processing time") > .watermark("ts", "ts - INTERVAL '5' SECOND") > .build() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified
[ https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377350#comment-17377350 ] jinfeng edited comment on FLINK-23169 at 7/8/21, 12:22 PM: --- [~fly_in_gis] thanks for your reply. When we use a Flink job management platform to submit jobs from different users, we config the 'yarn.staging-directory' ,for example. /flink/staging/ 1. if user1 submit the job, and user1 will create the /flink/staging/.flink/application_ user1 supergroup drwx--x--x 2. if user2 submit the job, and user2 can't use /flink/staging/.flink , becase directory `.flink` belongs to user1. 3. if we create /flink/staging/.flink first, there is also another problem, user1, user2, user3.. 's application staging directory will be created in the same directory . The reason why we don't use the default home directory is that we want to provided a common directory to different users. If not, every new user need to create their own home directory and apply quota. I notice that the spark also provided a config to specific the yarn staging directory, and it will be divided by user directory by default. was (Author: hackergin): [~fly_in_gis] thank for you reply. When we use a Flink job management platform to submit jobs from different users, we config the 'yarn.staging-directory' ,for example. /flink/staging/ 1. if user1 submit the job, and user1 will create the /flink/staging/.flink/application_ user1 supergroup drwx--x--x 2. if user2 submit the job, and user2 can't use /flink/staging/.flink , becase directory `.flink` belongs to user1. 3. if we create /flink/staging/.flink first, there is also another problem, user1, user2, user3.. 's application staging directory will be created in the same directory . The reason why we don't use the default home directory is that we want to provided a common directory to different users. If not, every new user need to create their own home directory and apply quota. I notice that the spark also provided a config to specific the yarn staging directory, and it will be divided by user directory by default. > Support user-level app staging directory when yarn.staging-directory is > specified > - > > Key: FLINK-23169 > URL: https://issues.apache.org/jira/browse/FLINK-23169 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: jinfeng >Priority: Major > > When yarn.staging-directory is specified, different users will use the same > directory as the staging directory. It may not friendly for a job platform > to submit job for different users. I propose to use the user-level directory > by default when yarn.staging-directory is specified. We only need to make > small changes for `getStagingDir` function in > YarnClusterDescriptor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified
[ https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377350#comment-17377350 ] jinfeng commented on FLINK-23169: - [~fly_in_gis] thank for you reply. When we use a Flink job management platform to submit jobs from different users, we config the 'yarn.staging-directory' ,for example. /flink/staging/ 1. if user1 submit the job, and user1 will create the /flink/staging/.flink/application_ user1 supergroup drwx--x--x 2. if user2 submit the job, and user2 can't use /flink/staging/.flink , becase directory `.flink` belongs to user1. 3. if we create /flink/staging/.flink first, there is also another problem, user1, user2, user3.. 's application staging directory will be created in the same directory . The reason why we don't use the default home directory is that we want to provided a common directory to different users. If not, every new user need to create their own home directory and apply quota. I notice that the spark also provided a config to specific the yarn staging directory, and it will be divided by user directory by default. > Support user-level app staging directory when yarn.staging-directory is > specified > - > > Key: FLINK-23169 > URL: https://issues.apache.org/jira/browse/FLINK-23169 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: jinfeng >Priority: Major > > When yarn.staging-directory is specified, different users will use the same > directory as the staging directory. It may not friendly for a job platform > to submit job for different users. I propose to use the user-level directory > by default when yarn.staging-directory is specified. We only need to make > small changes for `getStagingDir` function in > YarnClusterDescriptor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified
[ https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371886#comment-17371886 ] jinfeng commented on FLINK-23169: - [~fly_in_gis] please have a look at this ticket > Support user-level app staging directory when yarn.staging-directory is > specified > - > > Key: FLINK-23169 > URL: https://issues.apache.org/jira/browse/FLINK-23169 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: jinfeng >Priority: Major > > When yarn.staging-directory is specified, different users will use the same > directory as the staging directory. It may not friendly for a job platform > to submit job for different users. I propose to use the user-level directory > by default when yarn.staging-directory is specified. We only need to make > small changes for `getStagingDir` function in > YarnClusterDescriptor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified
jinfeng created FLINK-23169: --- Summary: Support user-level app staging directory when yarn.staging-directory is specified Key: FLINK-23169 URL: https://issues.apache.org/jira/browse/FLINK-23169 Project: Flink Issue Type: Improvement Components: Deployment / YARN Reporter: jinfeng When yarn.staging-directory is specified, different users will use the same directory as the staging directory. It may not friendly for a job platform to submit job for different users. I propose to use the user-level directory by default when yarn.staging-directory is specified. We only need to make small changes for `getStagingDir` function in YarnClusterDescriptor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22428) Translate timezone page to Chinese
[ https://issues.apache.org/jira/browse/FLINK-22428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330438#comment-17330438 ] jinfeng commented on FLINK-22428: - [~Leonard Xu] I can help to translate. > Translate timezone page to Chinese > -- > > Key: FLINK-22428 > URL: https://issues.apache.org/jira/browse/FLINK-22428 > Project: Flink > Issue Type: Task > Components: Documentation, Table SQL / API >Reporter: Leonard Xu >Priority: Major > > the file path is docs/content.zh/docs/dev/table/concepts/timezone.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22178) Support ignore-first-line option in new csv format
[ https://issues.apache.org/jira/browse/FLINK-22178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17318678#comment-17318678 ] jinfeng commented on FLINK-22178: - [~ykt836] I think ignore-first-line option is needed only when we use filesystem connector to read the csv file, so we only need to modify the AbstractCsvInputFormat and ignore the first line in the open method when splitStart == 0 , instread of modifying the CsvRowDataDeserializationSchema. I don’t know if my understanding is correct. > Support ignore-first-line option in new csv format > -- > > Key: FLINK-22178 > URL: https://issues.apache.org/jira/browse/FLINK-22178 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.13.0 >Reporter: Kurt Young >Assignee: jinfeng >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22178) Support ignore-first-line option in new csv format
[ https://issues.apache.org/jira/browse/FLINK-22178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17317954#comment-17317954 ] jinfeng commented on FLINK-22178: - [~ykt836] I can help to implement this, please assign this to me. > Support ignore-first-line option in new csv format > -- > > Key: FLINK-22178 > URL: https://issues.apache.org/jira/browse/FLINK-22178 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.13.0 >Reporter: Kurt Young >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22064) Don't submit statement set when no insert is added in the sql client
[ https://issues.apache.org/jira/browse/FLINK-22064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312280#comment-17312280 ] jinfeng commented on FLINK-22064: - [~jark] I can help to fix this. > Don't submit statement set when no insert is added in the sql client > - > > Key: FLINK-22064 > URL: https://issues.apache.org/jira/browse/FLINK-22064 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.0 >Reporter: Shengkai Fang >Priority: Major > Fix For: 1.13.0 > > > !https://static.dingtalk.com/media/lALPD4Bhs9KtV3jM4s0F2A_1496_226.png_720x720g.jpg?renderWidth=1496=226=1=0=im! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table
[ https://issues.apache.org/jira/browse/FLINK-22043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312144#comment-17312144 ] jinfeng commented on FLINK-22043: - Thanks [~yunta] > Introduce thrift format to (de)serialize thrift message only with table > - > > Key: FLINK-22043 > URL: https://issues.apache.org/jira/browse/FLINK-22043 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: jinfeng >Priority: Major > > We have implement thrift format in our company. I found that there was a mail > discuss > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html > , but no one seems to have time to implement it. > [~yuyang08] has done most of the things , I can help implement it based on > https://github.com/apache/flink/pull/8067 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table
[ https://issues.apache.org/jira/browse/FLINK-22043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312115#comment-17312115 ] jinfeng commented on FLINK-22043: - [~yunta] I agree that it should not be too complicated, and I can help to implement the both. > Introduce thrift format to (de)serialize thrift message only with table > - > > Key: FLINK-22043 > URL: https://issues.apache.org/jira/browse/FLINK-22043 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: jinfeng >Priority: Major > > We have implement thrift format in our company. I found that there was a mail > discuss > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html > , but no one seems to have time to implement it. > [~yuyang08] has done most of the things , I can help implement it based on > https://github.com/apache/flink/pull/8067 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table
[ https://issues.apache.org/jira/browse/FLINK-22043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311674#comment-17311674 ] jinfeng commented on FLINK-22043: - It may be simpler if we only consider the table, maybe we can open another jira to define a general thrift serializer. > Introduce thrift format to (de)serialize thrift message only with table > - > > Key: FLINK-22043 > URL: https://issues.apache.org/jira/browse/FLINK-22043 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: jinfeng >Priority: Major > > We have implement thrift format in our company. I found that there was a mail > discuss > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html > , but no one seems to have time to implement it. > [~yuyang08] has done most of the things , I can help implement it based on > https://github.com/apache/flink/pull/8067 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table
[ https://issues.apache.org/jira/browse/FLINK-22043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311522#comment-17311522 ] jinfeng commented on FLINK-22043: - [~yunta] I mean only support for sql, and we can use thrift format in flink SQL DDL. > Introduce thrift format to (de)serialize thrift message only with table > - > > Key: FLINK-22043 > URL: https://issues.apache.org/jira/browse/FLINK-22043 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: jinfeng >Priority: Major > > We have implement thrift format in our company. I found that there was a mail > discuss > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html > , but no one seems to have time to implement it. > [~yuyang08] has done most of the things , I can help implement it based on > https://github.com/apache/flink/pull/8067 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table
jinfeng created FLINK-22043: --- Summary: Introduce thrift format to (de)serialize thrift message only with table Key: FLINK-22043 URL: https://issues.apache.org/jira/browse/FLINK-22043 Project: Flink Issue Type: New Feature Reporter: jinfeng We have implement thrift format in our company. I found that there was a mail discuss http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html , but no one seems to have time to implement it. [~yuyang08] has done most of the things , I can help implement it based on https://github.com/apache/flink/pull/8067 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21741) Support SHOW JARS statement in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-21741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17310690#comment-17310690 ] jinfeng commented on FLINK-21741: - [~jark] I can help to implement this feature. > Support SHOW JARS statement in SQL Client > - > > Key: FLINK-21741 > URL: https://issues.apache.org/jira/browse/FLINK-21741 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client >Reporter: Jark Wu >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21906) Support computed column syntax for Hive DDL dialect
[ https://issues.apache.org/jira/browse/FLINK-21906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17310411#comment-17310411 ] jinfeng commented on FLINK-21906: - I learned the code in FLIP-152, what we need to do is to use antlr in flink-hive-connector to implement the syntax of watermark, and convert ast to CreateTableOperation, Maybe I can take a try. This feature is not target to 1.13 release, which means that I have more time to complete it , that's fine > Support computed column syntax for Hive DDL dialect > --- > > Key: FLINK-21906 > URL: https://issues.apache.org/jira/browse/FLINK-21906 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Jark Wu >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21978) Disable cast conversion between Numeric type and TIMESTAMP_LTZ type
[ https://issues.apache.org/jira/browse/FLINK-21978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309850#comment-17309850 ] jinfeng commented on FLINK-21978: - [~ykt836] milliseconds > Disable cast conversion between Numeric type and TIMESTAMP_LTZ type > --- > > Key: FLINK-21978 > URL: https://issues.apache.org/jira/browse/FLINK-21978 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Leonard Xu >Assignee: Leonard Xu >Priority: Major > Labels: pull-request-available > > Currently we has supported the cast conversion between Numeric type and > TIMESTAMP_LTZ type, we suppose the numeric value e.g `Long type 1000L` as > epoch seconds and then cast to TIMESTAMP_LTZ, but the java.lang.Long is a > conversion class of `LocalZonedTimestampType` and treats as milliseconds. > To avoid the inconsistency, we should disable it and encourage user to use > `TO_TIMESTAMP_LTZ(numeric, precisoon)` function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21906) Support computed column syntax for Hive DDL dialect
[ https://issues.apache.org/jira/browse/FLINK-21906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309749#comment-17309749 ] jinfeng commented on FLINK-21906: - [~jark] I'm interested in this, can you assgin this to me ? > Support computed column syntax for Hive DDL dialect > --- > > Key: FLINK-21906 > URL: https://issues.apache.org/jira/browse/FLINK-21906 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Jark Wu >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21907) Support watermark syntax for Hive DDL dialect
[ https://issues.apache.org/jira/browse/FLINK-21907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309748#comment-17309748 ] jinfeng commented on FLINK-21907: - [~jark] I'm interested in this, can you assgin this to me ? > Support watermark syntax for Hive DDL dialect > - > > Key: FLINK-21907 > URL: https://issues.apache.org/jira/browse/FLINK-21907 > Project: Flink > Issue Type: Sub-task >Reporter: Jark Wu >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21978) Disable cast conversion between Numeric type and TIMESTAMP_LTZ type
[ https://issues.apache.org/jira/browse/FLINK-21978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309471#comment-17309471 ] jinfeng commented on FLINK-21978: - +1 for introduce an build-in function for timestamp casting. In our compancy platform, we support a common udf to covert TIMESTAMP or BIGINT which is widely used. It should be much better if it can be made into a built-in function. > Disable cast conversion between Numeric type and TIMESTAMP_LTZ type > --- > > Key: FLINK-21978 > URL: https://issues.apache.org/jira/browse/FLINK-21978 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Leonard Xu >Assignee: Leonard Xu >Priority: Major > Labels: pull-request-available > > Currently we has supported the cast conversion between Numeric type and > TIMESTAMP_LTZ type, we suppose the numeric value e.g `Long type 1000L` as > epoch seconds and then cast to TIMESTAMP_LTZ, but the java.lang.Long is a > conversion class of `LocalZonedTimestampType` and treats as milliseconds. > To avoid the inconsistency, we should disable it and encourage user to use > `TO_TIMESTAMP_LTZ(numeric, precisoon)` function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21892) Add documentation for the DESC statement syntax
[ https://issues.apache.org/jira/browse/FLINK-21892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305681#comment-17305681 ] jinfeng commented on FLINK-21892: - [~jark] Sure, please assign this to me. > Add documentation for the DESC statement syntax > --- > > Key: FLINK-21892 > URL: https://issues.apache.org/jira/browse/FLINK-21892 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table SQL / API >Reporter: Jark Wu >Priority: Major > > We suppored {{DESC}} as an abbreviation of the DESCRIBE statement in > FLINK-21847. We should update the documentation page: > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/describe/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21847) Introduce DESC grammar in sql parser
[ https://issues.apache.org/jira/browse/FLINK-21847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17303823#comment-17303823 ] jinfeng commented on FLINK-21847: - [~fsk119] Can you assign this to me ? > Introduce DESC grammar in sql parser > > > Key: FLINK-21847 > URL: https://issues.apache.org/jira/browse/FLINK-21847 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Shengkai Fang >Priority: Major > > DESC is abbreviation of the DESCRIBE statement. DESC currently is only > supported in the sql client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21068) Add new timeout options for Elasticsearch connector
jinfeng created FLINK-21068: --- Summary: Add new timeout options for Elasticsearch connector Key: FLINK-21068 URL: https://issues.apache.org/jira/browse/FLINK-21068 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Affects Versions: 1.12.1 Reporter: jinfeng Currently, the connection.max-retry-timeout seems not work with new elasticsearch connector. The elasticsearch community has Remove setMaxRetryTimeoutMillis from RestClientBuilder. We can set timeout options when create RestHighLevelClient in Elasticsearch7ApiCallBridge , like {code:java} //代码占位符 @Override public RestHighLevelClient createClient(Map clientConfig) throws IOException { RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])); builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) { if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_TIMEOUT)) { builder.setConnectTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_TIMEOUT))); } if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)) { builder.setSocketTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT))); } if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)) { builder.setConnectionRequestTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT))); } return builder; } }); {code} So, we can add three table config to config eleasticsearch timeout. connection.timeout connection.socket-timeout connection.request-timeout -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20739) Ban `if` from HiveModule
[ https://issues.apache.org/jira/browse/FLINK-20739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254008#comment-17254008 ] jinfeng commented on FLINK-20739: - I see, if we can't ban it directly , maybe we can add interface for hive module to config some other functions easily [~lirui] > Ban `if` from HiveModule > -- > > Key: FLINK-20739 > URL: https://issues.apache.org/jira/browse/FLINK-20739 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.10.0 >Reporter: jinfeng >Priority: Major > > When using hiveModule, the if function is treated as a normal function. > If I have a SQL like this: > > {code:java} > insert into Sink select if(size(split(`test`, '-')) > 1, split(`test`, > '-')[2], 'error') from Source {code} > > It will throw arrayIndexOutOfBoundsException in Flink1.10, becase > size(split(`test`, '-')-) > 1 , split(`test`, '')[2], ‘error’ will be > calculated first, and then if function will be calculated -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20739) Ban `if` from HiveModule
[ https://issues.apache.org/jira/browse/FLINK-20739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jinfeng updated FLINK-20739: Description: When using hiveModule, the if function is treated as a normal function. If I have a SQL like this: {code:java} insert into Sink select if(size(split(`test`, '-')) > 1, split(`test`, '-')[2], 'error') from Source {code} It will throw arrayIndexOutOfBoundsException in Flink1.10, becase size(split(`test`, '-')-) > 1 , split(`test`, '')[2], ‘error’ will be calculated first, and then if function will be calculated was: When using hiveModule, the if function is treated as a normal function. If I have a SQL like this: {code:java} insert into Sink select if(size(split(`test`, '-')) > 1, split(`test`, '-')[10], 'error') from Source {code} It will throw arrayIndexOutOfBoundsException in Flink1.10, becase size(split(`test`, '-')) > 1 , split(`test`, '-')[10], ‘error’ will be calculated first, and then if function will be calculated > Ban `if` from HiveModule > -- > > Key: FLINK-20739 > URL: https://issues.apache.org/jira/browse/FLINK-20739 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.10.0 >Reporter: jinfeng >Priority: Major > > When using hiveModule, the if function is treated as a normal function. > If I have a SQL like this: > > {code:java} > insert into Sink select if(size(split(`test`, '-')) > 1, split(`test`, > '-')[2], 'error') from Source {code} > > It will throw arrayIndexOutOfBoundsException in Flink1.10, becase > size(split(`test`, '-')-) > 1 , split(`test`, '')[2], ‘error’ will be > calculated first, and then if function will be calculated -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20739) Ban `if` from HiveModule
jinfeng created FLINK-20739: --- Summary: Ban `if` from HiveModule Key: FLINK-20739 URL: https://issues.apache.org/jira/browse/FLINK-20739 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.10.0 Reporter: jinfeng When using hiveModule, the if function is treated as a normal function. If I have a SQL like this: {code:java} insert into Sink select if(size(split(`test`, '-')) > 1, split(`test`, '-')[10], 'error') from Source {code} It will throw arrayIndexOutOfBoundsException in Flink1.10, becase size(split(`test`, '-')) > 1 , split(`test`, '-')[10], ‘error’ will be calculated first, and then if function will be calculated -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19937) Support sink parallelism option for all connectors
[ https://issues.apache.org/jira/browse/FLINK-19937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229106#comment-17229106 ] jinfeng commented on FLINK-19937: - [~lzljs3620320] Got it , thanks for your reply. > Support sink parallelism option for all connectors > -- > > Key: FLINK-19937 > URL: https://issues.apache.org/jira/browse/FLINK-19937 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Ecosystem >Reporter: Lsw_aka_laplace >Priority: Major > Labels: pull-request-available > > Since https://issues.apache.org/jira/browse/FLINK-19727 has been done. > SINK_PARALLELISM option and `ParallelismProvider` should be applied for all > existing `DynamicTableSink` of connectors in order to give users access to > setting their own sink parallelism. > > > Update: > Anybody who works on this issue should refrence to FLINK-19727~ > `ParallelismProvider` should work with `SinkRuntimeProvider`, actually > `SinkFunctionProvider` and `OutputFormatProvider` has implemented > `ParallelismProvider`. And `SINK_PARALLELISM` has already defined in > `FactoryUtil`, plz reuse it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19937) Support sink parallelism option for all connectors
[ https://issues.apache.org/jira/browse/FLINK-19937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228944#comment-17228944 ] jinfeng commented on FLINK-19937: - After setting the parallelism, the sequence of data cannot be guaranteed, because the default physical partitioning is rebalance. Should we add a config that we can config the physical partitioning ? > Support sink parallelism option for all connectors > -- > > Key: FLINK-19937 > URL: https://issues.apache.org/jira/browse/FLINK-19937 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Ecosystem >Reporter: Lsw_aka_laplace >Priority: Major > Labels: pull-request-available > > Since https://issues.apache.org/jira/browse/FLINK-19727 has been done. > SINK_PARALLELISM option and `ParallelismProvider` should be applied for all > existing `DynamicTableSink` of connectors in order to give users access to > setting their own sink parallelism. > > > Update: > Anybody who works on this issue should refrence to FLINK-19727~ > `ParallelismProvider` should work with `SinkRuntimeProvider`, actually > `SinkFunctionProvider` and `OutputFormatProvider` has implemented > `ParallelismProvider`. And `SINK_PARALLELISM` has already defined in > `FactoryUtil`, plz reuse it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17139) The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree
[ https://issues.apache.org/jira/browse/FLINK-17139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083380#comment-17083380 ] jinfeng commented on FLINK-17139: - [~wenlong.lwl] . That's ture. The master of StreamTableEnvironmentImpl#isEagerOperationTranslation return false. > The sub-plan reuse optimizer can't reuse sub-plan that from different sql > node tree > > > Key: FLINK-17139 > URL: https://issues.apache.org/jira/browse/FLINK-17139 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: jinfeng >Priority: Major > > The sub-plan reuse optimizer can't reuse sub-plan that from different sql > node tree . > {code:java} > //代码占位符 > create table SourceTable ...; > create table SinkTable1 ; > create table SinkTable2 ; > insert into SinkTable1 select * from SourceTable; > insert into SinkTable2 select * from SourceTable; > {code} > SourceTable will not be reuse when I execute the below sql statements; > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17139) The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree
[ https://issues.apache.org/jira/browse/FLINK-17139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083367#comment-17083367 ] jinfeng commented on FLINK-17139: - [~ykt836] , I execute the sql by using sqlUpdate() ; > The sub-plan reuse optimizer can't reuse sub-plan that from different sql > node tree > > > Key: FLINK-17139 > URL: https://issues.apache.org/jira/browse/FLINK-17139 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: jinfeng >Priority: Major > > The sub-plan reuse optimizer can't reuse sub-plan that from different sql > node tree . > {code:java} > //代码占位符 > create table SourceTable ...; > create table SinkTable1 ; > create table SinkTable2 ; > insert into SinkTable1 select * from SourceTable; > insert into SinkTable2 select * from SourceTable; > {code} > SourceTable will not be reuse when I execute the below sql statements; > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17139) The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree
jinfeng created FLINK-17139: --- Summary: The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree Key: FLINK-17139 URL: https://issues.apache.org/jira/browse/FLINK-17139 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: jinfeng The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree . {code:java} //代码占位符 create table SourceTable ...; create table SinkTable1 ; create table SinkTable2 ; insert into SinkTable1 select * from SourceTable; insert into SinkTable2 select * from SourceTable; {code} SourceTable will not be reuse when I execute the below sql statements; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16451) listagg with distinct for over window codegen error
[ https://issues.apache.org/jira/browse/FLINK-16451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jinfeng updated FLINK-16451: Description: When I use lisgagg with distinct and over window. {code:java} //代码占位符 "select listagg(distinct product, '|') over(partition by user order by proctime rows between 200 preceding and current row) as product, user from " + testTable {code} I got the follwing exception {code:java} //代码占位符 Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, Size: 3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at java.util.Collections$UnmodifiableList.get(Collections.java:1311) at org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) {code} But It worked with {code:java} //代码占位符 select listagg(distinct product) over(partition by user order by proctime rows between 200 preceding and current row) as product, user from " + testTable {code} The exception will be throw at the below code. {code:java} //代码占位符 private def generateKeyExpression( ctx: CodeGeneratorContext, generator: ExprCodeGenerator): GeneratedExpression = { val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess( ctx, generator.input1Type, generator.input1Term, _, nullableInput = false, deepCopy = inputFieldCopy)) {code} The distinctInfo.argIndexs is [1, 3] . But the index 3 is a logical index. It will be replaced by '|' . And should not generate Input Access for index 3 was: When I use lisgagg with distinct and over window. {code:java} //代码占位符 "select listagg(distinct product, '|') over(partition by user order by proctime rows between 200 preceding and current row) as product, user from " + testTable {code}
[jira] [Updated] (FLINK-16451) listagg with distinct for over window codegen error
[ https://issues.apache.org/jira/browse/FLINK-16451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jinfeng updated FLINK-16451: Summary: listagg with distinct for over window codegen error (was: listagg with distinct for over window ) > listagg with distinct for over window codegen error > > > Key: FLINK-16451 > URL: https://issues.apache.org/jira/browse/FLINK-16451 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.2, 1.10.0 >Reporter: jinfeng >Priority: Major > > When I use lisgagg with distinct and over window. > {code:java} > //代码占位符 > "select listagg(distinct product, '|') over(partition by user order by > proctime rows between 200 preceding and current row) as product, user from " > + testTable > {code} > I got the follwing exception > {code:java} > //代码占位符 > Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, > Size: 3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at > java.util.ArrayList.get(ArrayList.java:433) at > java.util.Collections$UnmodifiableList.get(Collections.java:1311) at > org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at > org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635) > at > org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620) > at > org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524) > at > org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) > at > org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at > org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374) > at > org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > {code} > But It worked with > {code:java} > //代码占位符 > select listagg(distinct product) over(partition by user order by proctime > rows between 200 preceding and current row) as product, user from " + > testTable > {code} > > {code:java} > //代码占位符 > private def generateKeyExpression( > ctx: CodeGeneratorContext, > generator: ExprCodeGenerator):
[jira] [Created] (FLINK-16451) listagg with distinct for over window
jinfeng created FLINK-16451: --- Summary: listagg with distinct for over window Key: FLINK-16451 URL: https://issues.apache.org/jira/browse/FLINK-16451 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0, 1.9.2 Reporter: jinfeng When I use lisgagg with distinct and over window. {code:java} //代码占位符 "select listagg(distinct product, '|') over(partition by user order by proctime rows between 200 preceding and current row) as product, user from " + testTable {code} I got the follwing exception {code:java} //代码占位符 Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, Size: 3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at java.util.Collections$UnmodifiableList.get(Collections.java:1311) at org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) {code} But It worked with {code:java} //代码占位符 select listagg(distinct product) over(partition by user order by proctime rows between 200 preceding and current row) as product, user from " + testTable {code} {code:java} //代码占位符 private def generateKeyExpression( ctx: CodeGeneratorContext, generator: ExprCodeGenerator): GeneratedExpression = { val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess( ctx, generator.input1Type, generator.input1Term, _, nullableInput = false, deepCopy = inputFieldCopy)) {code} The exception will be throw at the below code. The distinctInfo.argIndexs is [1, 3] . But the index 3 is a logical index. It will be replaced by '|' . And should not generate Input Access for index 3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14356) Introduce "single-field" format to (de)serialize message to a single field
[ https://issues.apache.org/jira/browse/FLINK-14356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16948788#comment-16948788 ] jinfeng commented on FLINK-14356: - I am very happy to contribute this . It would be simple to implement the raw format that only support VARBINARY > Introduce "single-field" format to (de)serialize message to a single field > -- > > Key: FLINK-14356 > URL: https://issues.apache.org/jira/browse/FLINK-14356 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / API >Reporter: jinfeng >Priority: Major > > I want to use flink sql to write kafka messages directly to hdfs. The > serialization and deserialization of messages are not involved in the middle. > The bytes of the message directly convert the first field of Row. However, > the current RowSerializationSchema does not support the conversion of bytes > to VARBINARY. Can we add some special RowSerializationSchema and > RowDerializationSchema ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14356) Support some special RowDeserializationSchema and RowSerializationSchema
jinfeng created FLINK-14356: --- Summary: Support some special RowDeserializationSchema and RowSerializationSchema Key: FLINK-14356 URL: https://issues.apache.org/jira/browse/FLINK-14356 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / API Reporter: jinfeng I want to use flink sql to write kafka messages directly to hdfs. The serialization and deserialization of messages are not involved in the middle. The bytes of the message directly convert the first field of Row. However, the current RowSerializationSchema does not support the conversion of bytes to VARBINARY. Can we add some special RowSerializationSchema and RowDerializationSchema ? -- This message was sent by Atlassian Jira (v8.3.4#803005)