[jira] [Created] (FLINK-26954) Support Kafka bounded read in TableAPI

2022-03-31 Thread Yuan Zhu (Jira)
Yuan Zhu created FLINK-26954:


 Summary: Support Kafka bounded read in TableAPI
 Key: FLINK-26954
 URL: https://issues.apache.org/jira/browse/FLINK-26954
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Yuan Zhu


Kafka setBounded in DataStream API has been implemented in 1.12, but haven't 
ported to Table API yet.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25529) java.lang.ClassNotFoundException: org.apache.orc.PhysicalWriter when write bulkly into hive-2.1.1 orc table

2022-01-05 Thread Yuan Zhu (Jira)
Yuan Zhu created FLINK-25529:


 Summary: java.lang.ClassNotFoundException: 
org.apache.orc.PhysicalWriter when write bulkly into hive-2.1.1 orc table
 Key: FLINK-25529
 URL: https://issues.apache.org/jira/browse/FLINK-25529
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
 Environment: hive 2.1.1

flink 1.12.4
Reporter: Yuan Zhu
 Attachments: lib.jpg

I tried to write data bulkly into hive-2.1.1 with orc format, and encountered 
java.lang.ClassNotFoundException: org.apache.orc.PhysicalWriter

 

Using bulk writer by setting table.exec.hive.fallback-mapred-writer = false;

 
{code:java}
SET 'table.sql-dialect'='hive';
create table orders(
    order_id int,
    order_date timestamp,
    customer_name string,
    price decimal(10,3),
    product_id int,
    order_status boolean
)partitioned by (dt string)
stored as orc;
 
SET 'table.sql-dialect'='default';

create table datagen_source (
order_id int,
order_date timestamp(9),
customer_name varchar,
price decimal(10,3),
product_id int,
order_status boolean
)with('connector' = 'datagen');

create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/mnt/conf');
set table.exec.hive.fallback-mapred-writer = false;

insert into myhive.`default`.orders
/*+ OPTIONS(
    'sink.partition-commit.trigger'='process-time',
    'sink.partition-commit.policy.kind'='metastore,success-file',
    'sink.rolling-policy.file-size'='128MB',
    'sink.rolling-policy.rollover-interval'='10s',
    'sink.rolling-policy.check-interval'='10s',
    'auto-compaction'='true',
    'compaction.file-size'='1MB'    ) */
select * , date_format(now(),'-MM-dd') as dt from datagen_source;  {code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.orc.PhysicalWriter

 

My jars in lib dir listed in attachment.

In HiveTableSink#createStreamSink(line:270), createBulkWriterFactory if 
table.exec.hive.fallback-mapred-writer is false.

If table is orc, HiveShimV200#createOrcBulkWriterFactory will be invoked. 

OrcBulkWriterFactory depends on org.apache.orc.PhysicalWriter in orc-core, but 
flink-connector-hive excludes orc-core for conflicting with hive-exec.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25409) Add cache metric to LookupFunction

2021-12-21 Thread Yuan Zhu (Jira)
Yuan Zhu created FLINK-25409:


 Summary: Add cache metric to LookupFunction
 Key: FLINK-25409
 URL: https://issues.apache.org/jira/browse/FLINK-25409
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Ecosystem
Reporter: Yuan Zhu


Since we encounter performance problem when lookup join in production env 
frequently, adding metrics to monitor Lookup function cache is very helpful to 
troubleshoot.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25336) Kafka connector compatible problem in Flink sql

2021-12-15 Thread Yuan Zhu (Jira)
Yuan Zhu created FLINK-25336:


 Summary: Kafka connector compatible problem in Flink sql
 Key: FLINK-25336
 URL: https://issues.apache.org/jira/browse/FLINK-25336
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.0
 Environment: Flink 1.14.0

Kafka 0.10.2.1
Reporter: Yuan Zhu
 Attachments: log.jpg

When I use sql to query kafka table, like
{code:java}
create table `kfk`
(
user_id VARCHAR
) with (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json', 
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '163941120',
'properties.group.id' = 'test'
);

CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print');

insert into print_table select user_id from kfk;{code}
It will encounter an exception:

org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest 
versions older than 4 don't support the allowAutoTopicCreation field !log.jpg!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)