Took me some time to setup the Java test (using Java after more than a
decade!), but yes a similar pipeline with KafkaIO and Flink seems to work
fine.
Here's the relevant Java code. The only difference from the Python version
is that I had to extract the KV from the KafkaRecord object and construct a
PCollection<KV> explicitly before writing to the output topic.
~~~~~~~~
package org.apache.beam.kafka.test;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaTest {
static final String BOOTSTRAP_SERVERS = "localhost:29092"; // Default
bootstrap kafka servers
static final String INPUT_TOPIC = "in_topic"; // Default input kafka
topic name
static final String OUTPUT_TOPIC = "out_topic"; // Default output kafka
topic name
/** Specific pipeline options. */
public interface KafkaTestOptions extends PipelineOptions {
@Description("Kafka bootstrap servers")
@Default.String(BOOTSTRAP_SERVERS)
String getBootstrap();
void setBootstrap(String value);
@Description("Kafka input topic name")
@Default.String(INPUT_TOPIC)
String getInputTopic();
void setInputTopic(String value);
@Description("Kafka output topic name")
@Default.String(OUTPUT_TOPIC)
String getOutputTopic();
void setOutputTopic(String value);
}
public static final void main(String[] args) throws Exception {
final KafkaTestOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getInputTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class))
.apply(
"PrepareForWriting",
ParDo.of(
new DoFn<KafkaRecord<String, String>, KV<String, String>>()
{
@ProcessElement
public void processElement(ProcessContext c) throws
Exception {
c.output(KV.of(c.element().getKV().getKey(),
c.element().getKV().getValue()));
}
}))
.apply(
"WriteToKafka",
KafkaIO.<String, String>write()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getOutputTopic())
.withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
.withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
pipeline.run();
}
}
~~~~~~~~~
I'm firing the Java version as follows:
$ mvn exec:java
-Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest -Pflink-runner
-Dexec.args="--runner=FlinkRunner"
And I can see in real time, that as I publish records to the in_topic, the
out_topic is able to receive them on a continuous basis.
I hope this helps narrow down the issue.
Thanks,
Sumeet
On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <[email protected]>
wrote:
> Are you able to run a similar Java streaming pipeline using KafkaIO and
> Flink ? (without x-lang)
>
> Thanks,
> Cham
>
> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <[email protected]>
> wrote:
>
>> Hi Cham!
>>
>> So finally I was able to get partial success. Since I had pre-populated
>> the Kafka topic (in_topic) with 3 records, I set max_num_records=3 to see
>> if it can read all existing records, as follows:
>>
>> with beam.Pipeline(options=pipeline_options) as pipeline:
>> _ = (
>> pipeline
>> | 'Read from kafka' >> ReadFromKafka(
>> consumer_config={
>> 'bootstrap.servers': bootstrap_servers,
>> 'auto.offset.reset': 'earliest'},
>> topics=[in_topic],
>> max_num_records=3)
>> | 'Write to kafka' >> WriteToKafka(
>> producer_config={
>> 'bootstrap.servers': bootstrap_servers},
>> topic=out_topic))
>>
>> I was able to see all 3 records being read, and written successfully to
>> the out_topic as well. So, it appears that there might be some issue with
>> reading unbounded Kafka streams here? Or is there any setting that I might
>> be missing?
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>> [email protected]> wrote:
>>
>>> Hey Cham!
>>>
>>> Appreciate the response. I tried out your suggestions (details below),
>>> but I still don't see any data being consumed or written back to Kafka (as
>>> per your suggestion). I'm also providing additional details/context that
>>> might help narrow down the issue. Apologies for being a bit verbose from
>>> hereon!
>>>
>>> First, here's what my pipeline code looks like now:
>>>
>>> ~~~~~~
>>> import apache_beam as beam
>>> from apache_beam.io.kafka import ReadFromKafka
>>> from apache_beam.io.kafka import WriteToKafka
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>
>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>> pipeline_options = PipelineOptions(pipeline_args,
>>> save_main_session=True, streaming=True)
>>>
>>> logging.info('Starting data pipeline. bootstrap_servers=%s
>>> in_topic=%s out_topic=%s',
>>> str(bootstrap_servers), in_topic, out_topic)
>>>
>>> with beam.Pipeline(options=pipeline_options) as pipeline:
>>> _ = (
>>> pipeline
>>> | 'Read from kafka' >> ReadFromKafka(
>>> consumer_config={
>>> 'bootstrap.servers': bootstrap_servers,
>>> 'auto.offset.reset': 'earliest'
>>> },
>>> topics=[in_topic])
>>> | 'Write to kafka' >> WriteToKafka(
>>> producer_config={
>>> 'bootstrap.servers': bootstrap_servers
>>> },
>>> topic=out_topic))
>>>
>>> if __name__ == '__main__':
>>> logging.getLogger().setLevel(logging.INFO)
>>> import argparse
>>>
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument(
>>> '--bootstrap_servers',
>>> dest='bootstrap_servers',
>>> required=True,
>>> help='Bootstrap servers for the Kafka cluster')
>>> parser.add_argument(
>>> '--in_topic',
>>> dest='in_topic',
>>> required=True,
>>> help='Kafka topic to read data from')
>>> parser.add_argument(
>>> '--out_topic',
>>> dest='out_topic',
>>> required=True,
>>> help='Kafka topic to write data to')
>>> known_args, pipeline_args = parser.parse_known_args()
>>>
>>> run(known_args.bootstrap_servers, known_args.in_topic,
>>> known_args.out_topic, pipeline_args)
>>> ~~~~~
>>>
>>> I'm firing this pipeline as follows:
>>>
>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>
>>> I have pre-populated the Kafka topic with 3 records:
>>>
>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>> v1
>>> v2
>>> v3
>>>
>>> Now, when I execute the pipeline, I see that it starts to read records
>>> from offset 0, but then seeks to the latest offset 3 without processing the
>>> records. I don't see any data written to out_topic. I filtered out the logs
>>> a bit, and this is what I'm seeing:
>>>
>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092
>>> in_topic=in_topic out_topic=out_topic
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned to
>>> split 0 (total 1): in_topic-0'
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): in_topic-0'
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>> in_topic-0 to offset 0.'
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading from
>>> in_topic-0 starting at offset 0'
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>> partition(s): in_topic-0'
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST offset
>>> of partition in_topic-0'
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for
>>> partition in_topic-0 to offset 3.'
>>>
>>> Additionally, the logs also emit complete consumer and producer configs.
>>> I'm dumping them here, in case that helps:
>>>
>>> Consumer Config:
>>>
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig values:'
>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics =
>>> true'
>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms =
>>> 5000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset =
>>> earliest'
>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>> [localhost:29092]'
>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = default'
>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms =
>>> 540000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms =
>>> 60000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = false'
>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics =
>>> true'
>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = 52428800'
>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = 500'
>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1'
>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>> Reader-0_offset_consumer_1947524890_none'
>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms =
>>> 3000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []'
>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>> = true'
>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>>> read_uncommitted'
>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = class
>>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes =
>>> 1048576'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms =
>>> 300000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>>> 300000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level =
>>> INFO'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms =
>>> 30000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes =
>>> 65536'
>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms =
>>> 1000'
>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50'
>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = 30000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>> = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd =
>>> /usr/bin/kinit'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>> = 60000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name
>>> = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>> = 0.05'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>> = 0.8'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>> = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>> = 300'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>> = 60'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>> = 0.8'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>> = 0.05'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>> PLAINTEXT'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = 10000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
>>> [TLSv1.2, TLSv1.1, TLSv1]'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>> = https'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm =
>>> SunX509'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location =
>>> null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password =
>>> null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>> = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>> = PKIX'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location =
>>> null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password =
>>> null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS'
>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = class
>>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>
>>> Producer Config:
>>>
>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig values:'
>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>> [localhost:29092]'
>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = 33554432'
>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = default'
>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none'
>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms =
>>> 540000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms =
>>> 120000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = false'
>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []'
>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class
>>> org.apache.kafka.common.serialization.ByteArraySerializer'
>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>> = 5'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = 1048576'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>>> 300000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level =
>>> INFO'
>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms =
>>> 30000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = class
>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes =
>>> 32768'
>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms =
>>> 1000'
>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50'
>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = 30000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>> = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd =
>>> /usr/bin/kinit'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>> = 60000'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name
>>> = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>> = 0.05'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>> = 0.8'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>> = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>> = 300'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>> = 60'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>> = 0.8'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>> = 0.05'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>> PLAINTEXT'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
>>> [TLSv1.2, TLSv1.1, TLSv1]'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>> = https'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm =
>>> SunX509'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location =
>>> null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password =
>>> null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>> = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>> = PKIX'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location =
>>> null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password =
>>> null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS'
>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms =
>>> 60000'
>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = null'
>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = class
>>> org.apache.kafka.common.serialization.ByteArraySerializer'
>>>
>>>
>>> Apologies again for dumping almost everything here :-) Any pointers on
>>> what might be the issue are appreciated.
>>>
>>> Thanks,
>>> Sumeet
>>>
>>>
>>>
>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>> [email protected]> wrote:
>>>
>>>> Also can you try sending messages back to Kafka (or another distributed
>>>> system like GCS) instead of just printing them ? (given that multi-language
>>>> pipelines run SDK containers in Docker you might not see prints in the
>>>> original console I think).
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Sumeet,
>>>>>
>>>>> It seems like your kafka consumer uses the LATEST offset(which is
>>>>> default setting) as the start offset to read, which is 29. Do you have
>>>>> more
>>>>> than 29 records to read at that point? If the pipeline is only for testing
>>>>> purpose, I would recommend reading from earliest offset to see whether you
>>>>> get records. You can do so by constructing your ReadFromKafka like:
>>>>> ReadFromKafka(
>>>>> consumer_config={'bootstrap.servers': 'localhost:29092',
>>>>> 'auto.offset.reset':'earliest'},
>>>>> topics=['test'])
>>>>>
>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I'm trying out a simple example of reading data off a Kafka topic
>>>>>> into Apache Beam. Here's the relevant snippet:
>>>>>>
>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>> _ = (
>>>>>> pipeline
>>>>>> | 'Read from Kafka' >> ReadFromKafka(
>>>>>> consumer_config={'bootstrap.servers': 'localhost:29092'},
>>>>>> topics=['test'])
>>>>>> | 'Print' >> beam.Map(print))
>>>>>>
>>>>>> Using the above Beam pipeline snippet, I don't see any messages
>>>>>> coming in. Kafka is running locally in a docker container, and I'm able
>>>>>> to
>>>>>> use `kafkacat` from the host (outside the container) to publish and
>>>>>> subscribe to messages. So, I guess there are no issues on that front.
>>>>>>
>>>>>> It appears that Beam is able to connect to Kafka and get notified of
>>>>>> new messages, as I see the offset changes in the Beam logs as I publish
>>>>>> data from `kafkacat`:
>>>>>>
>>>>>> INFO:root:severity: INFO
>>>>>> timestamp {
>>>>>> seconds: 1612886861
>>>>>> nanos: 534000000
>>>>>> }
>>>>>> message: "[Consumer
>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST
>>>>>> offset
>>>>>> of partition test-0"
>>>>>> log_location:
>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>> thread: "22"
>>>>>>
>>>>>> INFO:root:severity: INFO
>>>>>> timestamp {
>>>>>> seconds: 1612886861
>>>>>> nanos: 537000000
>>>>>> }
>>>>>> message: "[Consumer
>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for
>>>>>> partition test-0 to offset 29."
>>>>>> log_location:
>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>> thread: "22"
>>>>>>
>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>
>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>> 1:foo
>>>>>> 1:bar
>>>>>>
>>>>>> and I can confirm that its being received, again using `kafkacat`:
>>>>>>
>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
>>>>>> Key: 1 Value: foo
>>>>>> Key: 1 Value: bar
>>>>>>
>>>>>> But despite this, I don't see the actual message being printed by
>>>>>> Beam as I expected. Any pointers to what's missing here are appreciated.
>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline side,
>>>>>> but could be incorrect.
>>>>>>
>>>>>> Thanks in advance for any pointers!
>>>>>>
>>>>>> Cheers,
>>>>>> Sumeet
>>>>>>
>>>>>