pyflink资源优化问题,请教

2021-04-05 文章
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。


现在的使用方式:
1、slide_window = 
Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka connector,
3、result table使用普通的print:
CREATE TABLE sink (
city_id STRING ,

start_time TIMESTAMP ,

end_time TIMESTAMP ,

flag   STRING

) with (
'connector' = 'print'
)
4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv()"),然后调用计算函数,读取csv文件内容
5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w,
 
city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()




这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。


所以想请教一下:
1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等


多谢

PyFlink How to set timeout for UDF

2021-02-02 文章
Hi:

Hope you are doing well!

My UDF always running in a long time, so I'm wondering, how to set timeout for 
UDF in Pyflink, in order to auto-stop the execution when it running in a long 
time.

Many Thanks!





 





 

PyFlink Expected IPC message of type schema but got record batch

2021-01-30 文章
Hi:

Hope you are good! I have a question for pyflink, details as below:

Feature: Windows of size 10 minutes that slides by 5 minutes for data 
aggregate, then do something, almost 2GB data per window, 1 million data items.

Job params:

bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \
-Dyarn.containers.vcores=4 \
-Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \
-Dtaskmanager.memory.managed.fraction=0.7 \
-Dtaskmanager.memory.task.off-heap.size=5120m \
-nm $task_name -qu $queue -d


Exception msg as below:

Traceback (most recent call last):
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 253, in _execute
response = task()
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 310, in 
lambda: self.create_worker().do_instruction(request), request)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 515, in process_bundle
bundle_processor.process_bundle(instruction_id))
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 978, in process_bundle
element.data)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 218, in process_encoded
self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 330, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 332, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 627, in decode_from_stream
yield self._decode_one_batch_from_stream(in_stream, 
in_stream.read_var_int64())
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 638, in _decode_one_batch_from_stream
return arrow_to_pandas(self._timezone, self._field_types, 
[next(self._batch_reader)])
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 631, in _load_from_stream
reader = pa.ipc.open_stream(stream)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
 line 137, in open_stream
return RecordBatchStreamReader(source)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
 line 61, in