[jira] [Created] (FLINK-20951) Flink SQL cli queries the partition table with hive as two fields. If the where condition does not specify all the two fields, an exception will occur

2021-01-12 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-20951:


 Summary: Flink SQL cli queries the partition table with hive as 
two fields. If the where condition does not specify all the two fields, an 
exception will occur
 Key: FLINK-20951
 URL: https://issues.apache.org/jira/browse/FLINK-20951
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive, Table SQL / Runtime
Affects Versions: 1.12.0
 Environment: flink 1.12.0release-12
sql-cli
Reporter: YUJIANBO


The production hive table is partitioned by two fields:datekey and event

I have do this test:
(1)First:
SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid
= 'aa';(OK)
SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa';
(Error)

(2)第二组对比
SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid
= 'bb';(OK)
SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb';
(Error)

The exception is:
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at
org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328)
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
... 6 more



Flink  Email:
http://apache-flink.147419.n8.nabble.com/Flink-sql-cli-hive-where-td10139.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21142) Flink guava Dependence problem

2021-01-25 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-21142:


 Summary: Flink guava Dependence problem
 Key: FLINK-21142
 URL: https://issues.apache.org/jira/browse/FLINK-21142
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hadoop Compatibility, Connectors / Hive
Affects Versions: 1.12.0
Reporter: YUJIANBO


We set up a new Hadoop cluster, and we use the flink1.12.0 compiled by the 
previous release-1.12.0 branch.If I add hive jar to flink/lib/, it will report 
errors.

*Operating environment:*
    flink1.12.0 
    Hadoop 3.3.0
    hive 3.1.2

*Flink run official demo shell: /tmp/yjb/buildjar/flink1.12.0/bin/flink run -m 
yarn-cluster /usr/local/flink1.12.0/examples/streaming/WordCount.jar*

If I put one of the jar *flink-sql-connector-hive-3.1.2_2.11-1.12.0.jar* or 
*hive-exec-3.1.2.jar* in the Lib directory and execute the above command, an 
error will be reported  java.lang.NoSuchMethodError : com.google.common . 
base.Preconditions.checkArgument (ZLjava/lang/String;Ljava/lang/Object;)V. *We 
can see that it's the dependency conflict of guava.*

*My cluster guava‘s version:*
 /usr/local/hadoop-3.3.0/share/hadoop/yarn/csi/lib/guava-20.0.jar
 /usr/local/hadoop-3.3.0/share/hadoop/common/lib/guava-27.0-jre.jar
 /usr/local/apache-hive-3.1.2-bin/lib/guava-20.0.jar
 /usr/local/apache-hive-3.1.2-bin/lib/jersey-guava-2.25.1.jar
 /usr/local/spark-3.0.1-bin-hadoop3.2/jars/guava-14.0.1.jar

*Can you give me some advice?*
 Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22806) The checkpoint state of FLINKSQL is getting bigger and bigger

2021-05-30 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-22806:


 Summary: The checkpoint state of FLINKSQL is getting bigger and 
bigger
 Key: FLINK-22806
 URL: https://issues.apache.org/jira/browse/FLINK-22806
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.12.0
Reporter: YUJIANBO


I have added the parameter :
   1、state.idle.duration: 3600,  
   2、state.checkpoints.num-retained:3
   3、sql:select count(*), LISTAGG(concat(m,n)) from tabeA group a,b

I set the checkpoint state for one hour, but the size of the folder directory 
/checkpoint/shared is still growing.  I observed it for two days and guessed 
that there was expired data in the  /checkpoint/shared folder that had not been 
cleared?

What else can limit the growth of state?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18862) sql 中agg算子的 count(1) 的结果不能强转成String类型

2020-08-09 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-18862:


 Summary: sql 中agg算子的 count(1) 的结果不能强转成String类型
 Key: FLINK-18862
 URL: https://issues.apache.org/jira/browse/FLINK-18862
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.1
Reporter: YUJIANBO






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19768) The shell "./yarn-session.sh " not use log4j-session.properties , it use log4j.properties

2020-10-22 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-19768:


 Summary: The shell  "./yarn-session.sh " not use 
log4j-session.properties , it use log4j.properties
 Key: FLINK-19768
 URL: https://issues.apache.org/jira/browse/FLINK-19768
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.11.2
Reporter: YUJIANBO


The shell  "./yarn-session.sh " not use log4j-session.properties , it use 
log4j.properties

My Flink Job UI shows  the $internal.yarn.log-config-file  is  
"/usr/local/flink-1.11.2/conf/log4j.properties",is it a bug?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23643) I repeated the same problem: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions

2021-08-05 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-23643:


 Summary: I repeated the same problem: ROW/RANGE not allowed with 
RANK, DENSE_RANK or ROW_NUMBER functions
 Key: FLINK-23643
 URL: https://issues.apache.org/jira/browse/FLINK-23643
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: YUJIANBO


I found a problem that was fixed in 1.12.0 and FLINK-18440, but I repeated the 
same problem:
ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions.

this is my sql:
```
create temporary view tmp as 
select 
   area_zip,
   store_id,
   product_id,
   batch_id,
   RANK() over (PARTITION BY area_zip,store_id,product_id ORDER BY ts DESC 
RANGE BETWEEN INTERVAL '5' MINUTE preceding AND CURRENT ROW) AS num_rank 
from pricechangelogs;


insert into print_table 
select * from tmp where num_rank = 1;
```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24224) Table to stream, only the row datatype Stream works on CEP, and other POJOs, maps and jsonobjects datatype streams do not work, but any datatype stream to CEP can work b

2021-09-08 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-24224:


 Summary: Table to stream, only the row datatype Stream works on 
CEP, and other POJOs, maps and jsonobjects datatype streams do not work, but 
any datatype stream to CEP can work by only stream api .
 Key: FLINK-24224
 URL: https://issues.apache.org/jira/browse/FLINK-24224
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.13.2, 1.12.0, 1.11.2
Reporter: YUJIANBO


 

1、problem:*Table to stream*, only the *ROW* datatype Stream works on *CEP*, and 
other POJOs, maps and jsonobjects datatype streams do not work, but any 
datatype stream to CEP can work by only stream api .

2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2

3、code:

(1)table to Stream  to  CEP   (only row datatype is ok,  other datatype Stream 
to CEP has no data print and it has no error message)
{code:java}
tableEnv.executeSql(creat_kafka_source);
tableEnv.executeSql(calculateSql);

Table tb = tableEnv.from("calculateSql");
String[] fieldNames = tb.getSchema().getFieldNames();
DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();

KeyedStream ds = tableEnv
.toAppendStream(tb, Row.class)
.map(new RichMapFunction() {
Map map = new HashMap<>();

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (null == map) {
map = new HashMap<>();
}
}

@Override
public JSONObject map(Row value) throws Exception {
//将数据key和value添加到map中
RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, 
fieldNames, value);
JSONObject jsonObject = 
JSONObject.parseObject(JSON.toJSONString(map));
map.clear();
return jsonObject;
}
})
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("wStart") * 1000;
}
}).keyBy(x -> x.getString("x_forwarded_for"));
//it has data to print
ds.print();

Pattern pattern = Pattern.begin("begin")
.where(new SimpleCondition() {
@Override
public boolean filter(JSONObject value) throws Exception {
log.info("===>" + value);
return true;
}
}).timesOrMore(1).within(Time.seconds(10));

PatternStream patternStream = CEP.pattern(ds, pattern);
//it has no data to print
patternStream.process(new PatternProcessFunction() {
@Override
public void processMatch(Map> match, Context ctx, 
Collector out) throws Exception {
out.collect("==>>>" + match.toString());
}
}).print();


{code}
(2) *Olny Stream API  to CEP* ( Any datatype ,  it is OK)
{code:java}
Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
FlinkKafkaConsumer> consumer =
new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs);
consumer.setStartFromEarliest();

SingleOutputStreamOperator input = env.addSource(consumer)
.map(x -> {
return JSON.parseObject(x.value());
})
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(10)) {
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("ts");
}
})
.keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
.timeWindow(Time.seconds(1)).apply(new WindowFunction() {
@Override
public void apply(String s, TimeWindow window, Iterable 
input, Collector out) throws Exception {
Iterator iterator = input.iterator();
ArrayList list = new ArrayList<>();
int n = 0;
while (iterator.hasNext()) {
n++;
JSONObject next = iterator.next();
list.add(next);
}
JSONObject jsonObject = list.get(0);
jsonObject.put("ct",n);
jsonObject.remove("ts");
out.collect(jsonObject);
}
});

input.print();

//it is ok
Pattern minInterval = Pattern
.begin("begin").where(new SimpleCondition() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return true;
}
}).timesOrMore(1).within(Time.seconds(10));

PatternStream pattern = CEP.pattern(input, minInterval);
pattern.process(new PatternProcessFunction() {
@Override
public void processMatch(Map> map, Context 
context, Collector out) thr