RE: Processing data from Kafka. Python

2023-12-20 Thread Поротиков Станислав Вячеславович via user
It seems to be fixed by adding option to Java expansion service:
"--experiments=use_deprecated_read"
I have found connected ticket: https://issues.apache.org/jira/browse/BEAM-11991

Best regards,
Stanislav Porotikov

From: Поротиков Станислав Вячеславович via user 
Sent: Tuesday, December 19, 2023 1:58 PM
To: user@beam.apache.org
Cc: Поротиков Станислав Вячеславович 
Subject: Processing data from Kafka. Python

I'm trying to read data from Kafka, make some processing and then write new 
data to another Kafka topic.
The problem is that task is probably stucked on the processing stage.
In the logs I can see it reads data from kafka constantly. But no new data 
appears in the sink Kafka topic
Could you help me, what I did wrong?

My pipeline:
pipeline_flink_environment = [
"--runner=FlinkRunner",
"--flink_submit_uber_jar",
"--streaming",
"--flink_master=localhost:8081",
"--environment_type=PROCESS",
"--environment_config={\"command\":\"/opt/apache/beam/boot\"}"
]


def run():
pipeline_options = PipelineOptions(pipeline_flink_environment)


with beam.Pipeline(options=pipeline_options) as pipeline:
kafka_message = (
pipeline
| 'Read topic from Kafka' >> 
ReadFromKafka(consumer_config=source_config,
  
topics=[source_topic],
  
expansion_service=kafka_process_expansion_service
)
| beam.WindowInto(beam.window.FixedWindows(15))
| 'Group elements' >> beam.GroupByKey()
| 'Write data to Kafka' >> 
WriteToKafka(producer_config=source_config,
topic=sink_topic,

expansion_service=kafka_process_expansion_service
)
 )


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()



Just few lines of logs, I can see, connected to python worker:

2023-12-19 08:18:04,634 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
for task Source: Impulse -> [3]Read topic from 
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
 KafkaIO.ReadSourceDescriptors} (1/1)#0 
856b8acfe73098d7075a2636a645f66d_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2023-12-19 08:18:05,581 INFO  
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn 
Logging client connected.
2023-12-19 08:18:05,626 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291
 [] - Not setting flag with value None: app_name
2023-12-19 08:18:05,627 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291
 [] - Not setting flag with value None: flink_conf_dir
2023-12-19 08:18:05,628 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111
 [] - semi_persistent_directory: /tmp
2023-12-19 08:18:05,628 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:356
 [] - No session file found: /tmp/staged/pickled_main_session. Functions 
defined in __main__ (interactive session) may fail.
2023-12-19 08:18:05,629 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:367
 [] - Discarding unparseable args: ['--direct_runner_use_stacked_bundle', 
'--options_id=1', '--pipeline_type_check']
2023-12-19 08:18:05,629 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135
 [] - Pipeline_options: {'streaming': True, 'job_name': 
'BeamApp-flink-1219081730-11566b15', 'gcp_oauth_scopes': 
['https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data', 
'https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data'], 
'default_sdk_harness_log_level': 'DEBUG', 'experiments': ['beam_fn_api'], 
'sdk_location': 'container', 'environment_type': 'PROCESS', 
'environment_config': '{"command":"/opt/apache/beam/boot&quo

Processing data from Kafka. Python

2023-12-19 Thread Поротиков Станислав Вячеславович via user
I'm trying to read data from Kafka, make some processing and then write new 
data to another Kafka topic.
The problem is that task is probably stucked on the processing stage.
In the logs I can see it reads data from kafka constantly. But no new data 
appears in the sink Kafka topic
Could you help me, what I did wrong?

My pipeline:
pipeline_flink_environment = [
"--runner=FlinkRunner",
"--flink_submit_uber_jar",
"--streaming",
"--flink_master=localhost:8081",
"--environment_type=PROCESS",
"--environment_config={\"command\":\"/opt/apache/beam/boot\"}"
]


def run():
pipeline_options = PipelineOptions(pipeline_flink_environment)


with beam.Pipeline(options=pipeline_options) as pipeline:
kafka_message = (
pipeline
| 'Read topic from Kafka' >> 
ReadFromKafka(consumer_config=source_config,
  
topics=[source_topic],
  
expansion_service=kafka_process_expansion_service
)
| beam.WindowInto(beam.window.FixedWindows(15))
| 'Group elements' >> beam.GroupByKey()
| 'Write data to Kafka' >> 
WriteToKafka(producer_config=source_config,
topic=sink_topic,

expansion_service=kafka_process_expansion_service
)
 )


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()



Just few lines of logs, I can see, connected to python worker:

2023-12-19 08:18:04,634 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
for task Source: Impulse -> [3]Read topic from 
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
 KafkaIO.ReadSourceDescriptors} (1/1)#0 
856b8acfe73098d7075a2636a645f66d_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2023-12-19 08:18:05,581 INFO  
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn 
Logging client connected.
2023-12-19 08:18:05,626 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291
 [] - Not setting flag with value None: app_name
2023-12-19 08:18:05,627 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291
 [] - Not setting flag with value None: flink_conf_dir
2023-12-19 08:18:05,628 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111
 [] - semi_persistent_directory: /tmp
2023-12-19 08:18:05,628 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:356
 [] - No session file found: /tmp/staged/pickled_main_session. Functions 
defined in __main__ (interactive session) may fail.
2023-12-19 08:18:05,629 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:367
 [] - Discarding unparseable args: ['--direct_runner_use_stacked_bundle', 
'--options_id=1', '--pipeline_type_check']
2023-12-19 08:18:05,629 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135
 [] - Pipeline_options: {'streaming': True, 'job_name': 
'BeamApp-flink-1219081730-11566b15', 'gcp_oauth_scopes': 
['https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data', 
'https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data'], 
'default_sdk_harness_log_level': 'DEBUG', 'experiments': ['beam_fn_api'], 
'sdk_location': 'container', 'environment_type': 'PROCESS', 
'environment_config': '{"command":"/opt/apache/beam/boot"}', 
'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 
'flink_submit_uber_jar': True}
2023-12-19 08:18:05,672 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/statecache.py:234
 [] - Creating state cache with size 104857600
2023-12-19 08:18:05,672 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:187
 [] - Creating insecure control channel for localhost:35427.
2023-12-19 08:18:05,679 INFO  

RE: Why do we need Job Server?

2023-12-05 Thread Поротиков Станислав Вячеславович via user
I guess it's about not documented --flink_submit_uber_jar pipeline option.

Best regards,
Stanislav Porotikov

From: Alexey Romanenko 
Sent: Tuesday, December 5, 2023 3:48 PM
To: user 
Subject: Re: Why do we need Job Server?

Oh, interesting. I didn’t know about that possibility, thanks

—
Alexey


On 4 Dec 2023, at 18:14, Robert Bradshaw via user 
mailto:user@beam.apache.org>> wrote:

Note that this shouldn't be strictly necessary, e.g. for Python one can embed 
the pipeline definition into the jar itself which is then just uploaded as an 
"ordinary" flink executable jar to the Flink master: 
https://github.com/apache/beam/blob/release-2.52.0/sdks/python/apache_beam/runners/portability/abstract_job_service.py#L301
 If Java doens't do this yet we should probably update it to do so.

On Mon, Dec 4, 2023 at 7:10 AM Alexey Romanenko 
mailto:aromanenko@gmail.com>> wrote:
There are two modes to run a job with FlinkRunner - Portable and Classic. If 
you run a job server in Portable mode then you meed to start a JobService, 
configured with your Flink cluster, and submit your job through this. If you 
run a job in Classical mode (only Java SDK pipeline) then you don’t need it.

More information on this is here:
Apache Flink Runner<https://beam.apache.org/documentation/runners/flink/>
beam.apache.org<https://beam.apache.org/documentation/runners/flink/>

<https://beam.apache.org/documentation/runners/flink/>



—
Alexey



On 4 Dec 2023, at 07:53, Поротиков Станислав Вячеславович via user 
mailto:user@beam.apache.org>> wrote:

Hello!
I want to know which cases could lead me to use separate job server for 
submutting jobs to Flink Cluster?
Which cases we don't need it at all?


Best regards,
Stanislav Porotikov




RE: Beam detached mode?

2023-12-04 Thread Поротиков Станислав Вячеславович via user
Thank you!
But can we achive it throw the Beam, by providing some settings to Python 
pipeline?

From: Pavel Solomin 
Sent: Monday, December 4, 2023 6:27 PM
To: user@beam.apache.org
Cc: Поротиков Станислав Вячеславович 
Subject: Re: Beam detached mode?

Hello!

Yes. You can run it like this:


flink run --class com.my.BeamFlinkApp --detached ...

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | 
Linkedin<https://www.linkedin.com/in/pavelsolomin>




On Mon, 4 Dec 2023 at 12:45, Поротиков Станислав Вячеславович via user 
mailto:user@beam.apache.org>> wrote:
Hello!
I there any option in beam to run pipeline in detached mode?
I want to submit job to Flink and exit from pipeline without any errors.

Best regards,
Stanislav Porotikov



Beam detached mode?

2023-12-04 Thread Поротиков Станислав Вячеславович via user
Hello!
I there any option in beam to run pipeline in detached mode?
I want to submit job to Flink and exit from pipeline without any errors.

Best regards,
Stanislav Porotikov



Why do we need Job Server?

2023-12-03 Thread Поротиков Станислав Вячеславович via user
Hello!
I want to know which cases could lead me to use separate job server for 
submutting jobs to Flink Cluster?
Which cases we don't need it at all?


Best regards,
Stanislav Porotikov



Control who can submit Beam jobs

2023-11-30 Thread Поротиков Станислав Вячеславович via user
Hello!
Is there any way to control who can submit jobs to Flink cluster. We have 
multiple teams and I am looking for decision how can we use Beam+Flink safely.

Best regards,
Stanislav Porotikov



Error while trying to connect to Kafka from Flink runner

2023-11-22 Thread Поротиков Станислав Вячеславович via user
Hello!
I am trying to run Beam pipeline in local docker-compose environment on top of 
Flink. I wrote my own Dockerfile for Flink jobmanager and taskmanager.
Dockerfile for my-image-apache-beam/flink:1.16-java11:
FROM flink:1.16-java11



# python SDK
COPY --from=apache/beam_python3.10_sdk /opt/apache/beam/ /opt/apache/beam/

# java SDK
COPY --from=apache/beam_java11_sdk:2.51.0 /opt/apache/beam/ 
/opt/apache/beam_java/

COPY krb5.conf /etc/
My docker-compose.yml
version: "2.2"
services:
  jobmanager:
image: my-image-apache-beam/flink:1.16-java11
ports:
  - "8081:8081"
volumes:
  - artifacts:/tmp/beam-artifact-staging
command: jobmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager

  taskmanager:
image: registry.kontur.host/srs/apache-beam/flink:1.16-java11
depends_on:
  - jobmanager
command: taskmanager
ports:
  - "8100-8200:8100-8200"
volumes:
  - artifacts:/tmp/beam-artifact-staging
scale: 1
extra_hosts:
  - "host.docker.internal:host-gateway"
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 2Gb

  beam_job_server:
image: apache/beam_flink1.16_job_server
command: --flink-master=jobmanager --job-host=0.0.0.0
ports:
  - "8097:8097"
  - "8098:8098"
  - "8099:8099"
volumes:
  - artifacts:/tmp/beam-artifact-staging

  python-worker-harness:
image: "apache/beam_python3.10_sdk"
command: "-worker_pool"
ports:
  - "5:5"
volumes:
  - artifacts:/tmp/beam-artifact-staging


volumes:
artifacts:
And eventually my pipeline:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka, 
default_io_expansion_service

import os
import logging


job_server = "localhost"


pipeline_external_environment = [
"--runner=PortableRunner",
f"--job_endpoint={job_server}:8099",
f"--artifact_endpoint={job_server}:8098",
"--environment_type=EXTERNAL",
"--environment_config=python-worker-harness:5"
]

kafka_process_expansion_service = default_io_expansion_service(
append_args=[
"--defaultEnvironmentType=PROCESS",

"--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam_java/boot\"}"
]
)


def run():
pipeline_options = PipelineOptions(pipeline_external_environment)

sasl_kerberos_principal = os.getenv('SASL_KERBEROS_PRINCIPAL')
sasl_kerberos_password = os.getenv('SASL_KERBEROS_PASSWORD')

source_config = {
'bootstrap.servers':
'kafka-host1:9093,kafka-host2:9093,kafka-host3:9093',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.principal': f'{sasl_kerberos_principal}',
'sasl.kerberos.kinit.cmd': f'kinit -R || echo {sasl_kerberos_password} 
| kinit {sasl_kerberos_principal}',
'sasl.jaas.config':
f'com.sun.security.auth.module.Krb5LoginModule required debug=true 
principal={sasl_kerberos_principal} useTicketCache=true;',
'group.id': 'test_group_1',
'auto.offset.reset': 'earliest'}

source_topic = 'Test_Source2-0_0_0_0.id-0'

sink_topic = 'Beam.Test'

with beam.Pipeline(options=pipeline_options) as pipeline:
outputs = (pipeline
   | 'Read topic from Kafka' >> 
ReadFromKafka(consumer_config=source_config,
  
topics=[source_topic],
  
expansion_service=kafka_process_expansion_service
  )
   | 'Write topic to Kafka' >> 
WriteToKafka(producer_config=source_config,
topic=sink_topic,

expansion_service=kafka_process_expansion_service
)
   )


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

But I got stuck with ERROR below:
INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Receive 
slot request 2f0a7a3cd89226651c2f84bd11e23321 for job 
1dc3e31750be59cab4f2fcd0710b255e from resource manager with leader id 
.
2023-11-22 12:52:29,065 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Allocated 
slot for 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,065 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
1dc3e31750be59cab4f2fcd0710b255e for job leader monitoring.
2023-11-22 12:52:29,066 INFO