[jira] [Commented] (BEAM-3788) Implement a Kafka IO for Python SDK

2020-03-15 Thread Nicolae Rosia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059941#comment-17059941
 ] 

Nicolae Rosia commented on BEAM-3788:
-

until this is fixed, is there a way to implement an unbounded source in Python? 
If yes, then I could work around this by using a Kafka library in Python, such 
as [https://pypi.org/project/kafka-python/]

> Implement a Kafka IO for Python SDK
> ---
>
> Key: BEAM-3788
> URL: https://issues.apache.org/jira/browse/BEAM-3788
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>
> Java KafkaIO will be made available to Python users as a cross-language 
> transform.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-03-15 Thread Nicolae Rosia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059596#comment-17059596
 ] 

Nicolae Rosia commented on BEAM-9046:
-

Hello, I have the same issue with either of these versions:
 * 2.19.0
 * 2.20.0
 * master

+ Flink 1.9 + Kafka

Is there a known working example from which I could start?

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -
>
> Key: BEAM-9046
> URL: https://issues.apache.org/jira/browse/BEAM-9046
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-kafka
>Affects Versions: 2.16.0
>Reporter: Berkay Öztürk
>Priority: Major
>  Labels: KafkaIO, Python
> Fix For: Not applicable
>
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK 
> for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 
> 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 
> https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow 
> -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
> with Pipeline(options=PipelineOptions([
> '--runner=FlinkRunner',
> '--flink_version=1.8',
> '--flink_master_url=localhost:8081',
> '--environment_type=LOOPBACK',
> '--streaming'
> ])) as pipeline:
> (
> pipeline
> | 'read' >> ReadFromKafka({'bootstrap.servers': 
> 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
> )
> result = pipeline.run()
> result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation BeamApp-USER-somejob. 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: xxx)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 13 more
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> at 
> org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> at 
> org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
> at 
>