RE: Processing data from Kafka. Python
It seems to be fixed by adding option to Java expansion service: "--experiments=use_deprecated_read" I have found connected ticket: https://issues.apache.org/jira/browse/BEAM-11991 Best regards, Stanislav Porotikov From: Поротиков Станислав Вячеславович via user Sent: Tuesday, December 19, 2023 1:58 PM To: user@beam.apache.org Cc: Поротиков Станислав Вячеславович Subject: Processing data from Kafka. Python I'm trying to read data from Kafka, make some processing and then write new data to another Kafka topic. The problem is that task is probably stucked on the processing stage. In the logs I can see it reads data from kafka constantly. But no new data appears in the sink Kafka topic Could you help me, what I did wrong? My pipeline: pipeline_flink_environment = [ "--runner=FlinkRunner", "--flink_submit_uber_jar", "--streaming", "--flink_master=localhost:8081", "--environment_type=PROCESS", "--environment_config={\"command\":\"/opt/apache/beam/boot\"}" ] def run(): pipeline_options = PipelineOptions(pipeline_flink_environment) with beam.Pipeline(options=pipeline_options) as pipeline: kafka_message = ( pipeline | 'Read topic from Kafka' >> ReadFromKafka(consumer_config=source_config, topics=[source_topic], expansion_service=kafka_process_expansion_service ) | beam.WindowInto(beam.window.FixedWindows(15)) | 'Group elements' >> beam.GroupByKey() | 'Write data to Kafka' >> WriteToKafka(producer_config=source_config, topic=sink_topic, expansion_service=kafka_process_expansion_service ) ) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() Just few lines of logs, I can see, connected to python worker: 2023-12-19 08:18:04,634 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 856b8acfe73098d7075a2636a645f66d_cbc357ccb763df2852fee8c4fc7d55f2_0_0. 2023-12-19 08:18:05,581 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-12-19 08:18:05,626 WARN /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291 [] - Not setting flag with value None: app_name 2023-12-19 08:18:05,627 WARN /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291 [] - Not setting flag with value None: flink_conf_dir 2023-12-19 08:18:05,628 INFO /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111 [] - semi_persistent_directory: /tmp 2023-12-19 08:18:05,628 WARN /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:356 [] - No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail. 2023-12-19 08:18:05,629 WARN /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:367 [] - Discarding unparseable args: ['--direct_runner_use_stacked_bundle', '--options_id=1', '--pipeline_type_check'] 2023-12-19 08:18:05,629 INFO /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135 [] - Pipeline_options: {'streaming': True, 'job_name': 'BeamApp-flink-1219081730-11566b15', 'gcp_oauth_scopes': ['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data', 'https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data'], 'default_sdk_harness_log_level': 'DEBUG', 'experiments': ['beam_fn_api'], 'sdk_location': 'container', 'environment_type': 'PROCESS', 'environment_config': '{"command":"/opt/apache/beam/boot&quo
Processing data from Kafka. Python
I'm trying to read data from Kafka, make some processing and then write new data to another Kafka topic. The problem is that task is probably stucked on the processing stage. In the logs I can see it reads data from kafka constantly. But no new data appears in the sink Kafka topic Could you help me, what I did wrong? My pipeline: pipeline_flink_environment = [ "--runner=FlinkRunner", "--flink_submit_uber_jar", "--streaming", "--flink_master=localhost:8081", "--environment_type=PROCESS", "--environment_config={\"command\":\"/opt/apache/beam/boot\"}" ] def run(): pipeline_options = PipelineOptions(pipeline_flink_environment) with beam.Pipeline(options=pipeline_options) as pipeline: kafka_message = ( pipeline | 'Read topic from Kafka' >> ReadFromKafka(consumer_config=source_config, topics=[source_topic], expansion_service=kafka_process_expansion_service ) | beam.WindowInto(beam.window.FixedWindows(15)) | 'Group elements' >> beam.GroupByKey() | 'Write data to Kafka' >> WriteToKafka(producer_config=source_config, topic=sink_topic, expansion_service=kafka_process_expansion_service ) ) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() Just few lines of logs, I can see, connected to python worker: 2023-12-19 08:18:04,634 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 856b8acfe73098d7075a2636a645f66d_cbc357ccb763df2852fee8c4fc7d55f2_0_0. 2023-12-19 08:18:05,581 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-12-19 08:18:05,626 WARN /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291 [] - Not setting flag with value None: app_name 2023-12-19 08:18:05,627 WARN /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291 [] - Not setting flag with value None: flink_conf_dir 2023-12-19 08:18:05,628 INFO /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111 [] - semi_persistent_directory: /tmp 2023-12-19 08:18:05,628 WARN /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:356 [] - No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail. 2023-12-19 08:18:05,629 WARN /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:367 [] - Discarding unparseable args: ['--direct_runner_use_stacked_bundle', '--options_id=1', '--pipeline_type_check'] 2023-12-19 08:18:05,629 INFO /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135 [] - Pipeline_options: {'streaming': True, 'job_name': 'BeamApp-flink-1219081730-11566b15', 'gcp_oauth_scopes': ['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data', 'https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data'], 'default_sdk_harness_log_level': 'DEBUG', 'experiments': ['beam_fn_api'], 'sdk_location': 'container', 'environment_type': 'PROCESS', 'environment_config': '{"command":"/opt/apache/beam/boot"}', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'flink_submit_uber_jar': True} 2023-12-19 08:18:05,672 INFO /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/statecache.py:234 [] - Creating state cache with size 104857600 2023-12-19 08:18:05,672 INFO /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:187 [] - Creating insecure control channel for localhost:35427. 2023-12-19 08:18:05,679 INFO