Re: Flink Elastic Sink

2020-05-30 Thread Leonard Xu
Hi, aj

> I was confused before as I was thinking the sink builder is called only once 
> but it gets called for every batch request, correct me if my understanding is 
> wrong. 

You’re right that sink builder should be called only once rather than every 
batch requests, could you post some code piece of using the sink?

Best,
Leonard Xu



> On Fri, May 29, 2020 at 9:08 AM Leonard Xu  > wrote:
> Hi,aj
> 
> In the implementation of ElasticsearchSink, ElasticsearchSink  won't create 
> index and only start a Elastic client for sending requests to
> the Elastic cluster. You can simply extract the index(date value in your 
> case) from your timestamp field and then put it to an IndexRequest[2],  
> ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic 
> cluster will create corresponding index and flush the records.
> 
> BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql 
> connector [2], you can simply config 'connector.index' = 
> ‘myindex_{ts_field|-MM-dd}’ to achieve your goals.
> 
> Best,
> Leoanrd Xu
> [1] 
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119
>  
> 
>  
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>  
> 
> 
> 
> 
> 
>> 在 2020年5月29日,02:43,aj mailto:ajainje...@gmail.com>> 
>> 写道:
>> 
>> Hello All,
>> 
>> I am getting many events in Kafka and I have written a link job that sinks 
>> that Avro records from Kafka to S3 in parquet format. 
>> 
>> Now, I want to sink these records into elastic search. but the only 
>> challenge is that I want to sink record on time indices. Basically, In 
>> Elastic, I want to create a per day index with the date as the suffix. 
>> So in Flink stream job if I create an es sink how will I change the sink to 
>> start writing  in a new index when the first event of the day arrives
>> 
>> Thanks,
>> Anuj. 
>> 
>> 
>>  
>> 
>> 
>>  
> 
> 
> -- 
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877 
> Skype : anuj.jain07
>  
> 
> 
>  


Flink s3 streaming performance

2020-05-30 Thread venkata sateesh` kolluru
Hello,

I have posted the same in stackoverflow but didnt get any response. So
posting it here for help.

https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787

Details:

I am working on a flink application on kubernetes(eks) which consumes data
from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming
from kafka is super fast.

These are just string messages from kafka.

There is a high back pressure while writing to s3. We are not even hitting
the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing
only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as
OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

StreamingFileSink> sink = StreamingFileSink
.forRowFormat(new Path(s3://BUCKET),
(Tuple3 element,
OutputStream stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f2);
})
// Determine component type for each record
.withBucketAssigner(new CustomBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
.build();

Is there anything that we can optimize on s3 from streamfilesink or in
flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new
RocksDBStateBackend(s3://checkpoint_bucket, true));
env.enableCheckpointing(30);


Re: Auto adjusting watermarks?

2020-05-30 Thread Theo Diefenthal
Hi Congxian, 

Thank's for your feedback. You raised a point I also already thought about. As 
"assignTimestampsAndWatermarks" creates an operator extending the standard 
AbstractUdfStreamOperator, I can also implement a RichFunction watermark 
assigner with full state access. In my case, I was also wondering whether it's 
a good idea to have a stateful watermark assigner or whether its more practical 
to have no state on start and build my histogram over time, new with each job 
restart... That's also why I asked on the mailing list so I can feedback of 
other people customizing the watermark assigners. 

Best regards 
Theo 


Von: "Congxian Qiu"  
An: "Theo Diefenthal"  
CC: "user"  
Gesendet: Samstag, 30. Mai 2020 05:06:12 
Betreff: Re: Auto adjusting watermarks? 

Hi 
Could it be store a histogram data in custom ` 
BoundedOutOfOrdernessTimestampExtractor` and adjust the `maxOutOfOrderness` 
according to the histogram data ok for you case? (be careful, such histogram 
data would not snapshot out when checkpointing) 

Best, 
Congxian 


Theo Diefenthal < [ mailto:theo.diefent...@scoop-software.de | 
theo.diefent...@scoop-software.de ] > 于2020年5月30日周六 上午4:35写道: 



Hi there, 

Currently I have a job pipeline reading data from > 10 different kind of 
sources with each having different out-of-orderness characteristics. I am 
currently working on adjusting the watermarks for each source "properly". I 
work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the 
maxOutOfOrderness as low as possible while still keeping as much elements as 
possible in time as late arrivals trigger rather expensive computations. 

Now I thought, what I probably want is something like "I want to have about 
99.9% of my elements within the allowed lateness". Of course, I don't know the 
future events out-of-orderness, but I can predict it from the past, e.g. via a 
histogram with a 99.9% percentile, and adjust the maxOutOfOrdernesss 
dynamically. 

As Flink provides rather simplified Timestamp Assigner only but allows me to 
create my own ones with arbitrary complexity, I was wondering if somebody of 
you already did something like that, if that's a viable approach and I'm on a 
good track here? 

Best regards 
Theo 





Re: Data Stream Enrichement

2020-05-30 Thread Lasse Nedergaard
Hi. 
If you can cache the data i state it’s the preferred way. Then you read all you 
values from a store do a key by and store them in state in a coprocessfunction. 
If you need to do a lookup for every row you have to use the AsyncIO function

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 30. maj 2020 kl. 18.49 skrev Taher Koitawala :
> 
> 
> The Open method would be a great! And close method could close it when 
> operator closes! 
> 
> Also for external calls AsyncIO is a great operator. Give that a look.
> 
> 
> Regards,
> Taher Koitawala
> 
>> On Sat, May 30, 2020, 10:17 PM Aissa Elaffani  
>> wrote:
>> Hello Guys,
>> I want to enrich a data stream with some mongoDB data, and I am willing to 
>> use the RichFlatMapFunction, and I am lost , i don't know where to configure 
>> the connection with my MongoDB. Can anyone Help me in this ? 
>> Best,
>> Aissa
>> 


Re: Data Stream Enrichement

2020-05-30 Thread Taher Koitawala
The Open method would be a great! And close method could close it when
operator closes!

Also for external calls AsyncIO is a great operator. Give that a look.


Regards,
Taher Koitawala

On Sat, May 30, 2020, 10:17 PM Aissa Elaffani 
wrote:

> Hello Guys,
> I want to enrich a data stream with some mongoDB data, and I am willing to
> use the RichFlatMapFunction, and I am lost , i don't know where to
> configure the connection with my MongoDB. Can anyone Help me in this ?
> Best,
> Aissa
>
>


Data Stream Enrichement

2020-05-30 Thread Aissa Elaffani
Hello Guys,
I want to enrich a data stream with some mongoDB data, and I am willing to
use the RichFlatMapFunction, and I am lost , i don't know where to
configure the connection with my MongoDB. Can anyone Help me in this ?
Best,
Aissa


State expiration in Flink

2020-05-30 Thread Vasily Melnik
Hi .
I'm a bit confused with this point in State TTL documentation:
" By default, expired values are explicitly removed on read, such as
ValueState#value, and periodically garbage collected in the background if
supported by the configured state backend. "
Does it mean, that  if i have only one event with specific key, it's state
will never be cleaned on TTL expiration cause of i will never call value
method for this key again?


Re: Re: Re: Flink Window with multiple trigger condition

2020-05-30 Thread aj
Thanks Yun.

I have converted the code to use a keyed-processed function rather than a
flatMap and using register timer it worked.

On Fri, May 29, 2020 at 11:13 AM Yun Gao  wrote:

> Hi,
>
>  I think you could use *timer* to achieve that. In *processFunction*
> you could register a timer at specific time (event time or processing time)
> and get callbacked at that point. It could be registered like
>
>
> ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
>
>
> More details on timer could be found in [1] and an example is in [2].
> In this example, a timer is registered in the last line of the
> *processElement* method, and the callback is implemented by override the
> *onTimer* method.
>
>[1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
>[2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example
>
>
> --Original Mail --
> *Sender:*aj 
> *Send Date:*Fri May 29 02:07:33 2020
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: Re: Flink Window with multiple trigger condition
>
>> Hi,
>>
>> I have implemented the below solution and its working fine but the
>> biggest problem with this is if no event coming for the user after 30 min
>> then I am not able to trigger because I am checking
>> time diff from upcoming events. So when the next event comes than only it
>> triggers but I want it to trigger just after 30 mins.
>>
>> So please help me to improve this and how to solve the above problem.
>>
>>
>>
>> public class DemandSessionFlatMap extends RichFlatMapFunction> GenericRecord>, DemandSessionSummaryTuple> {
>>
>> private static final Logger LOGGER = 
>> LoggerFactory.getLogger(DemandSessionFlatMap.class);
>>
>> private transient ValueState> timeState; // 
>> maintain session_id starttime and endtime
>> private transient MapState 
>> sessionSummary; // map for hex9 and summarytuple
>>
>> @Override
>> public void open(Configuration config) {
>>
>> ValueStateDescriptor> timeDescriptor =
>> new ValueStateDescriptor<>(
>> "time_state", // the state name
>> TypeInformation.of(new TypeHint> Long>>() {
>> }), // type information
>> Tuple3.of(null, 0L, 0L)); // default value of the 
>> state, if nothing was set
>> timeState = getRuntimeContext().getState(timeDescriptor);
>>
>> MapStateDescriptor descriptor =
>> new MapStateDescriptor> DemandSessionSummaryTuple>("demand_session",
>> TypeInformation.of(new TypeHint() {
>> }), TypeInformation.of(new 
>> TypeHint() {
>> }));
>> sessionSummary = getRuntimeContext().getMapState(descriptor);
>>
>> }
>>
>> @Override
>> public void flatMap(Tuple2 recordTuple2, 
>> Collector collector) throws Exception {
>> GenericRecord record = recordTuple2.f1;
>> String event_name = record.get("event_name").toString();
>> long event_ts = (Long) record.get("event_ts");
>> Tuple3 currentTimeState = timeState.value();
>>
>> if (event_name.equals("search_list_keyless") && currentTimeState.f1 
>> == 0) {
>> currentTimeState.f1 = event_ts;
>> String demandSessionId = UUID.randomUUID().toString();
>> currentTimeState.f0 = demandSessionId;
>> }
>>
>> long timeDiff = event_ts - currentTimeState.f1;
>>
>> if (event_name.equals("keyless_start_trip") || timeDiff >= 180) {
>> Tuple3 finalCurrentTimeState = 
>> currentTimeState;
>> sessionSummary.entries().forEach( tuple ->{
>> String key = tuple.getKey();
>> DemandSessionSummaryTuple sessionSummaryTuple = 
>> tuple.getValue();
>> try {
>> sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
>> collector.collect(sessionSummaryTuple);
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>> });
>> timeState.clear();
>> sessionSummary.clear();
>> currentTimeState = timeState.value();
>> }
>>
>> if (event_name.equals("search_list_keyless") && currentTimeState.f1 
>> == 0) {
>> currentTimeState.f1 = event_ts;
>> String demandSessionId = UUID.randomUUID().toString();
>> currentTimeState.f0 = demandSessionId;
>> }
>> currentTimeState.f2 = event_ts;
>>
>> if (currentTimeState.f1 > 0) {
>> String search_hex9 = record.get("search_hex9") != null ? 
>> record.get("search_hex9").toString() : null;
>> DemandSessionSummaryTuple currentTuple = 
>> sessionSummary.get(search_hex9) != null ? 

Re: Flink Elastic Sink

2020-05-30 Thread aj
Thanks, It worked.

I was confused before as I was thinking the sink builder is called only
once but it gets called for every batch request, correct me if my
understanding is wrong.

On Fri, May 29, 2020 at 9:08 AM Leonard Xu  wrote:

> Hi,aj
>
> In the implementation of ElasticsearchSink, ElasticsearchSink  won't
> create index and only start a Elastic client for sending requests to
> the Elastic cluster. You can simply extract the index(date value in your
> case) from your timestamp field and then put it to an IndexRequest[2],
>  ElasticsearchSink will send the IndexRequests to the Elastic cluster,
> Elastic cluster will create corresponding index and flush the records.
>
> BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch
> sql connector [2], you can simply config 'connector.index' =
> ‘myindex_{ts_field|-MM-dd}’ to achieve your goals.
>
> Best,
> Leoanrd Xu
> [1]
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>
>
>
>
> 在 2020年5月29日,02:43,aj  写道:
>
> Hello All,
>
> I am getting many events in Kafka and I have written a link job that sinks
> that Avro records from Kafka to S3 in parquet format.
>
> Now, I want to sink these records into elastic search. but the only
> challenge is that I want to sink record on time indices. Basically, In
> Elastic, I want to create a per day index with the date as the suffix.
> So in Flink stream job if I create an es sink how will I change the sink
> to start writing  in a new index when the first event of the day arrives
>
> Thanks,
> Anuj.
>
>
> 
>
>
> 
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re:re:提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下

2020-05-30 Thread 程龙






是用代码提交的jobmanager 是可以加载的 就是启动taskmanager 这个目录就没有创建, 界面如下 ,错误日志就是我下面贴出来的那个










在 2020-05-30 19:16:57,"462329521" <462329...@qq.com> 写道:
>你的提交命令是什么呢看样子是加载不到配置文件
>
>
>-- 原始邮件 --
>发件人: "程龙"<13162790...@163.com;
>发件时间: 2020-05-30 19:13
>收件人: "user-zh"主题: 提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下
>
>
>
>2020-05-30 19:07:31,418 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner 
> - 
>
> 2020-05-30 19:07:31,418 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - Registered UNIX signal handlers for [TERM, HUP, INT] 
>2020-05-30 19:07:31,420 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner 
> - Current working Directory: 
>/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08
> 2020-05-30 19:07:31,427 ERROR org.apache.flink.yarn.YarnTaskExecutorRunner
>  - YARN TaskManager initialization failed. 
>org.apache.flink.configuration.IllegalConfigurationException: The Flink config 
>file 
>'/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08/flink-conf.yaml'
> 
>(/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08)
> does not exist. at 
>org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112)
> at 
>org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:114)
> at 
>org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:82)


re:提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下

2020-05-30 Thread 462329521
你的提交命令是什么呢看样子是加载不到配置文件


-- 原始邮件 --
发件人: "程龙"<13162790...@163.com;
发件时间: 2020-05-30 19:13
收件人: "user-zh"

提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下

2020-05-30 Thread 程龙
2020-05-30 19:07:31,418 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- 


2020-05-30 19:07:31,418 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- Registered UNIX signal handlers for [TERM, HUP, INT]

2020-05-30 19:07:31,420 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- Current working Directory: 
/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08

2020-05-30 19:07:31,427 ERROR org.apache.flink.yarn.YarnTaskExecutorRunner  
- YARN TaskManager initialization failed.

org.apache.flink.configuration.IllegalConfigurationException: The Flink config 
file 
'/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08/flink-conf.yaml'
 
(/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08)
 does not exist.

at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112)

at 
org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:114)

at 
org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:82)

回复: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-30 Thread wangl...@geekplus.com.cn

It is because the jar conflict and i have fixed it. 

I put  flink-connector-kafka_2.11-1.10.0.jar in the flink lib directory. 
Also in my project pom file has the dependency  flink-connector-kafka and 
builded as a fat jar

Thanks,
Lei



wangl...@geekplus.com.cn

 
发件人: Leonard Xu
发送时间: 2020-05-26 15:47
收件人: Aljoscha Krettek
抄送: user
主题: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema
Hi,wanglei

I think Aljoscha is wright. Could you post your denpendency list?
Dependency flink-connector-kafka is used in dataStream Application which you 
should use, dependency flink-sql-connector-kafka is used in Table API & SQL 
Application. We should only add one of them because the two dependency will 
conflict.   

Best,
Leonard Xu

在 2020年5月26日,15:02,Aljoscha Krettek  写道:

I think what might be happening is that you're mixing dependencies from the 
flink-sql-connector-kafka and the proper flink-connector-kafka that should be 
used with the DataStream API. Could that be the case?

Best,
Aljoscha

On 25.05.20 19:18, Piotr Nowojski wrote:
Hi,
It would be helpful if you could provide full stack trace, what Flink version 
and which Kafka connector version are you using?
It sounds like either a dependency convergence error (mixing Kafka 
dependencies/various versions of flink-connector-kafka inside a single job/jar) 
or some shading issue. Can you check your project for such issues (`mvn 
dependency:tree` command [1]).
Also what’s a bit suspicious for me is the return type:
Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
I’m not sure, but I was not aware that we are shading Kafka dependency in our 
connectors? Are you manually shading something?
Piotrek
[1] 
https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
 

On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote:


public class MyKafkaSerializationSchema implements 
KafkaSerializationSchema> {
@Override
public ProducerRecord serialize(Tuple2 o, 
@Nullable Long aLong) {
ProducerRecord record = new ProducerRecord<>(o.f0,
o.f1.getBytes(StandardCharsets.UTF_8));
return record;
}
}
FlinkKafkaProducer> producer = new 
FlinkKafkaProducer>(
"default", new MyKafkaSerializationSchema(),
prop2,Semantic.EXACTLY_ONCE);

But there's  error when runnng:

java.lang.AbstractMethodError: 
com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

Any suggestion on this?

Thanks,
Lei
wangl...@geekplus.com.cn 




Re: Sorting Bounded Streams

2020-05-30 Thread Benchao Li
Hi Satyam,

You are correct. Blink planner is built on top of DataStream, both for
batch and streaming.
Hence you cannot transform Table into DataSet if you are using blink
planner.

AFAIK, the community is working on the unification of batch and streaming.
And the the unification
will be Table API/SQL on top of BoundedStream. Then the DataSet/DataStream
API will be unified
as BoundedStream API. Hence the DataSet API is not the recommended approach
for the long term.


Satyam Shekhar  于2020年5月30日周六 下午3:34写道:

> Thanks for your reply, Benchao Li.
>
> While I can use the Blink planner in batch mode, I'd still have to work
> with DataSet. Based on my limited reading it appears to me that DataStream
> is being extended to support both batch and steaming use-cases with the
> `isBounded` method in the StreamTableSource interface. Is that correct?
>
> Is working with DataSet the recommended approach for the long term? Are
> there any performance implications for that decision?
>
> Regards,
> Satyam
>
>
> On Fri, May 29, 2020 at 9:01 PM Benchao Li  wrote:
>
>> Hi Satyam,
>>
>> Are you using blink planner in streaming mode? AFAIK, blink planner in
>> batch mode can sort on arbitrary columns.
>>
>> Satyam Shekhar  于2020年5月30日周六 上午6:19写道:
>>
>>> Hello,
>>>
>>> I am using Flink as the streaming execution engine for building a
>>> low-latency alerting application. The use case also requires ad-hoc
>>> querying on batch data, which I also plan to serve using Flink to avoid the
>>> complexity of maintaining two separate engines.
>>>
>>> My current understanding is that Order By operator in Blink planner (on
>>> DataStream) requires time attribute as the primary sort column. This is
>>> quite limiting for ad-hoc querying. It seems I can use the DataSet API to
>>> obtain a globally sorted output on an arbitrary column but that will force
>>> me to use the older Flink planner.
>>>
>>> Specifically, I am looking for guidance from the community on the
>>> following questions -
>>>
>>>1. Is it possible to obtain a globally sorted output on DataStreams
>>>on an arbitrary sort column?
>>>2. What are the tradeoffs in using DataSet vs DataStream in
>>>performance, long term support, etc?
>>>3. Is there any other way to address this issue?
>>>
>>> Regards,
>>> Satyam
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li


Re: Sorting Bounded Streams

2020-05-30 Thread Satyam Shekhar
Thanks for your reply, Benchao Li.

While I can use the Blink planner in batch mode, I'd still have to work
with DataSet. Based on my limited reading it appears to me that DataStream
is being extended to support both batch and steaming use-cases with the
`isBounded` method in the StreamTableSource interface. Is that correct?

Is working with DataSet the recommended approach for the long term? Are
there any performance implications for that decision?

Regards,
Satyam


On Fri, May 29, 2020 at 9:01 PM Benchao Li  wrote:

> Hi Satyam,
>
> Are you using blink planner in streaming mode? AFAIK, blink planner in
> batch mode can sort on arbitrary columns.
>
> Satyam Shekhar  于2020年5月30日周六 上午6:19写道:
>
>> Hello,
>>
>> I am using Flink as the streaming execution engine for building a
>> low-latency alerting application. The use case also requires ad-hoc
>> querying on batch data, which I also plan to serve using Flink to avoid the
>> complexity of maintaining two separate engines.
>>
>> My current understanding is that Order By operator in Blink planner (on
>> DataStream) requires time attribute as the primary sort column. This is
>> quite limiting for ad-hoc querying. It seems I can use the DataSet API to
>> obtain a globally sorted output on an arbitrary column but that will force
>> me to use the older Flink planner.
>>
>> Specifically, I am looking for guidance from the community on the
>> following questions -
>>
>>1. Is it possible to obtain a globally sorted output on DataStreams
>>on an arbitrary sort column?
>>2. What are the tradeoffs in using DataSet vs DataStream in
>>performance, long term support, etc?
>>3. Is there any other way to address this issue?
>>
>> Regards,
>> Satyam
>>
>
>
> --
>
> Best,
> Benchao Li
>