alxp1982 commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1139770934


##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -0,0 +1,83 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text 
files from **Google Cloud Storage** **(GCS)** in a pipeline.
+To write data to a file on **GCS**, you can use the Write method and pass in 
the **GCS** file path as a string. Here is an example of writing a string to a 
text file named "**myfile.txt**" in a **GCS** bucket named "**mybucket**":
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "gs://mybucket/myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("gs://mybucket/myfile.txt"));
+pipeline.run();
+```
+{{end}}
+
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('gs://mybucket/myfile.txt')
+p.run()
+```
+{{end}}
+
+{{if (eq .Sdk "go")}}
+It is important to note that in order to interact with GCS you will need to 
set up authentication, you can do that by setting the appropriate 
**GOOGLE_APPLICATION_CREDENTIALS** environment variable or using the 
`options.WithCredentials` method during pipeline creation.
+
+```
+options := []beam.PipelineOption{
+    beam.WithCredentials(creds),
+}
+p, err := beam.NewPipeline(options...)
+```
+Where `creds` is an instance of `google.Credentials`.
+{{end}}
+
+{{if (eq .Sdk "python")}}
+It is important to note that in order to interact with **GCS** you will need 
to set up authentication, you can do that by setting the appropriate 
**GOOGLE_APPLICATION_CREDENTIALS** environment variable or by using the 
with_options method during pipeline creation and passing gcp_project and 
`gcp_credentials` options.
+
+```
+options = PipelineOptions()
+google_cloud_options = options.view_as(GoogleCloudOptions)
+google_cloud_options.project = 'my-project-id'
+google_cloud_options.job_name = 'myjob'
+google_cloud_options.staging_location = 'gs://my-bucket/staging'
+google_cloud_options.temp_location = 'gs://my-bucket/temp'
+google_cloud_options.region = 'us-central1'
+
+# set credentials
+credentials = GoogleCredentials.get_application_default()
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+It is important to note that in order to interact with **GCS** you will need 
to set up authentication, need specify in the console as an additional parameter
+```
+--tempLocation=gs://my-bucket/temp
+```
+{{end}}

Review Comment:
   Please add the playground exercise and the challenge



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-write/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be 
executed on a variety of runtime environments, including **Apache Flink**, 
**Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache 
Beam provides a way to read and write text files in a pipeline. To read a local 
file using TextIO, you can use the Read method and pass in the file path as a 
string. Here is an example of reading a local text file named "**myfile.txt**" 
and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the 
file path as a string. Here is an example of writing a string to a local text 
file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}
+It is important to note that the `Read` and `Write` methods only read and 
write to local file systems and not the distributed file systems like **HDFS**, 
**GCS**, **S3** etc.

Review Comment:
   Please add playground exercise description and challenge



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be 
executed on a variety of runtime environments, including **Apache Flink**, 
**Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache 
Beam provides a way to read and write text files in a pipeline. To read a local 
file using TextIO, you can use the Read method and pass in the file path as a 
string. Here is an example of reading a local text file named "**myfile.txt**" 
and printing its contents:

Review Comment:
   The `TextIO` class in Apache Beam provides a way to read and write text 
files in a pipeline. To read a local file using TextIO, you can use the Read 
method and pass in the file path as a string. Here is an example of reading a 
local text file named "**myfile.txt**" and printing its contents:



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file

Review Comment:
   ### Reading from local text files using TextIO



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be 
executed on a variety of runtime environments, including **Apache Flink**, 
**Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache 
Beam provides a way to read and write text files in a pipeline. To read a local 
file using TextIO, you can use the Read method and pass in the file path as a 
string. Here is an example of reading a local text file named "**myfile.txt**" 
and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the 
file path as a string. Here is an example of writing a string to a local text 
file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}
+It is important to note that the `Read` and `Write` methods only read and 
write to local file systems and not the distributed file systems like **HDFS**, 
**GCS**, **S3** etc.

Review Comment:
   ### Playground Excercise 
   In the playground window, you can find an example that reads from a text 
file and outputs individual words found in the text. Can you modify this 
example to output found words to another file in reverse form? 



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be 
executed on a variety of runtime environments, including **Apache Flink**, 
**Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache 
Beam provides a way to read and write text files in a pipeline. To read a local 
file using TextIO, you can use the Read method and pass in the file path as a 
string. Here is an example of reading a local text file named "**myfile.txt**" 
and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the 
file path as a string. Here is an example of writing a string to a local text 
file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}

Review Comment:
   Please add sections describing how to read from multiple files and how to 
use filenames in PCollection.
   
   Something like:
   With TextIO, you can also read from multiple files by passing a filepattern 
instead of a name. For example:
   examples of using file patterns
   
   You can also use filenames in PCollection to read data from. To do that, 
apply 'readAll()' to the collection which contains filenames:
   examples of applying readAll to PCollection
   
   With filepatterns, you can watch for new files matching the pattern and read 
once they appear. For example: 
   example of watching for new files



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-read/description.md:
##########
@@ -0,0 +1,51 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text 
files from **Google Cloud Storage** **(GCS)** in a pipeline. To read a text 
file from GCS using TextIO, you can use the Read method and pass in the GCS 
file path as a string, which starts with "**gs://**" prefix. Here is an example 
of reading a text file named "**myfile.txt**" from a GCS bucket named 
"**mybucket**" and printing its contents:
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+lines := textio.Read(p, "gs://mybucket/myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(TextIO.read().from("gs://mybucket/myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+pipeline.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+p | beam.io.ReadFromText('gs://mybucket/myfile.txt') | beam.Map(print)
+p.run()
+```
+{{end}}

Review Comment:
   ### Playground Excercise
   In the playground window, you can find an example that reads from a text 
file and outputs individual words found in the text. Can you modify this 
example to read from multiple files matching patterns and watch for new files 
added? 



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-table/description.md:
##########
@@ -0,0 +1,59 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading BigQuery table

Review Comment:
   ### Reading from BigQuery table



##########
learning/tour-of-beam/learning-content/IO/big-query-io/table-schema/description.md:
##########
@@ -0,0 +1,121 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery with table-schema
+
+{{if (eq .Sdk "java")}}
+In Apache Beam, the `BigQueryIO` package provides the ability to read from and 
write to Google `BigQuery`. To use this package, you need to define a table 
schema for your BigQuery table, which specifies the names, data types, and 
modes of the columns in the table.
+```
+type User struct {
+       ID   int32  `bigquery:"id"`
+       Name string `bigquery:"name"`
+       Age  int32  `bigquery:"age"`
+}
+
+rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, 
DatasetID: datasetID, TableID: tableID},
+               beam.WithSchema(User{}))
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+`DynamicDestinations` is a feature provided by the `BigQueryIO` class in 
Apache Beam that allows you to write data to different BigQuery tables based on 
the input elements. The feature allows you to specify a function that takes an 
input element and returns the destination table information (table name, 
schema, etc) for that element.
+
+`DynamicDestinations` interface provided by the `BigQueryIO` class in Apache 
Beam has three methods:
+
+* `getDestination`: takes an input element and returns a TableDestination 
object, which contains the information about the destination table.
+* `getTable`: It takes an input element and returns the table name as a string.
+* `getSchema`: It takes a table name and returns the schema as a TableSchema 
object.
+
+Here is an example of how you might use the `BigQueryIO.write()` method with 
DynamicDestinations to write data to different BigQuery tables based on the 
input elements:
+
+```
+weatherData.apply(
+    BigQueryIO.<WeatherData>write()
+        .to(
+            new DynamicDestinations<WeatherData, Long>() {
+              @Override
+              public Long getDestination(ValueInSingleWindow<WeatherData> 
elem) {
+                return elem.getValue().year;
+              }
+
+              @Override
+              public TableDestination getTable(Long destination) {
+                return new TableDestination(
+                    new TableReference()
+                        .setProjectId(writeProject)
+                        .setDatasetId(writeDataset)
+                        .setTableId(writeTable + "_" + destination),
+                    "Table for year " + destination);
+              }
+
+              @Override
+              public TableSchema getSchema(Long destination) {
+                return new TableSchema()
+                    .setFields(
+                        ImmutableList.of(
+                            new TableFieldSchema()
+                                .setName("year")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("month")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("day")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("maxTemp")
+                                .setType("FLOAT")
+                                .setMode("NULLABLE")));
+              }
+            })
+        .withFormatFunction(
+            (WeatherData elem) ->
+                new TableRow()
+                    .set("year", elem.year)
+                    .set("month", elem.month)
+                    .set("day", elem.day)
+                    .set("maxTemp", elem.maxTemp))
+        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
+```
+{{end}}
+

Review Comment:
   What about golang? There is an example but no learning material



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from 
Apache Kafka and write data to it. It allows for the creation of Beam pipelines 
that can consume data from a Kafka topic, process the data and write the 
processed data back to another Kafka topic. This makes it possible to build 
data processing pipelines using Apache Beam that can easily integrate with a 
Kafka-based data architecture.
+
+{{if (eq .Sdk "go")}}
+```
+var (
+       expansionAddr = flag.String("expansion_addr", "",
+               "Address of Expansion Service. If not specified, attempts to 
automatically start an appropriate expansion service.")
+       bootstrapServers = flag.String("bootstrap_servers", "",
+               "(Required) URL of the bootstrap servers for the Kafka cluster. 
Should be accessible by the runner.")
+       topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic 
to write to and read from.")
+)
+
+read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic})
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+p.apply("ReadFromKafka",
+                        KafkaIO.<String, String>read()
+                                .withBootstrapServers("localhost:29092")
+                                .withTopicPartitions(
+                                        Collections.singletonList(
+                                                new TopicPartition(
+                                                        "NYCTaxi1000_simple",
+                                .withKeyDeserializer(StringDeserializer.class)
+                                
.withValueDeserializer(StringDeserializer.class)
+                                .withConsumerConfigUpdates(consumerConfig)
+                                .withMaxNumRecords(998)
+                                .withoutMetadata())
+```
+{{end}}
+
+
+{{if (eq .Sdk "python")}}
+```
+input_topic = 'input-topic'
+output_topic = 'output-topic'
+
+(p | "Read from Kafka" >> ReadFromKafka(
+      topics=[input_topic],
+      bootstrap_servers='localhost:9092')
+ | "Process data" >> beam.Map(process_data))
+```
+{{end}}

Review Comment:
   Please add playground exercise description and challenge is missing



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-write/description.md:
##########
@@ -0,0 +1,53 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO

Review Comment:
   ### Writing to Kafka using KafkaIO



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from 
Apache Kafka and write data to it. It allows for the creation of Beam pipelines 
that can consume data from a Kafka topic, process the data and write the 
processed data back to another Kafka topic. This makes it possible to build 
data processing pipelines using Apache Beam that can easily integrate with a 
Kafka-based data architecture.
+

Review Comment:
   >  KafkaIO returns an unbounded collection of Kafka records alongside 
metadata such as topic-partition and offset



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-read/description.md:
##########
@@ -0,0 +1,51 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file

Review Comment:
   ### Reading from Google Cloud Storage text file using TextIO



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-write/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file

Review Comment:
   Why duplicate the text-io-local-read topic? Let's remove this one



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/go-example/main.go:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// beam-playground:
+//   name: text-io-local-read
+//   description: TextIO read local file example.
+//   multifile: false
+//   context_line: 30
+//   categories:
+//     - Quickstart
+//   complexity: MEDIUM
+//   tags:
+//     - hellobeam
+
+
+package main
+
+import (
+       "regexp"

Review Comment:
   Spacing



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -0,0 +1,83 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file

Review Comment:
   ### Writing to Google Cloud Storage text file using TextIO



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-read/description.md:
##########
@@ -0,0 +1,51 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text 
files from **Google Cloud Storage** **(GCS)** in a pipeline. To read a text 
file from GCS using TextIO, you can use the Read method and pass in the GCS 
file path as a string, which starts with "**gs://**" prefix. Here is an example 
of reading a text file named "**myfile.txt**" from a GCS bucket named 
"**mybucket**" and printing its contents:
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+lines := textio.Read(p, "gs://mybucket/myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(TextIO.read().from("gs://mybucket/myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+pipeline.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+p | beam.io.ReadFromText('gs://mybucket/myfile.txt') | beam.Map(print)
+p.run()
+```
+{{end}}

Review Comment:
   Please illustrate how to configure\pass GCP credentials. Also please add 
sections related to reading from multiple files, using file patterns and 
watching for new files



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-query/description.md:
##########
@@ -0,0 +1,42 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading BigQuery query results
+
+Apache Beam's `BigQueryIO` connector allows you to read data from `BigQuery` 
tables and use it as a source for your data pipeline. The `BigQueryIO.Read()` 
method is used to read data from a `BigQuery` table based on a **SQL query**.
+The `BigQueryIO.Read()` method reads data from a `BigQuery` table in parallel 
by automatically splitting the query into smaller pieces and running each piece 
in a separate `BigQuery` job. This can improve performance for large tables, 
but can also increase the cost of running your pipeline.
+
+{{if (eq .Sdk "go")}}
+```
+bigquery.NewClient(context.Background(), options).Read(p,
+               bigquery.Query("SELECT max_temperature FROM 
`tess-372508.fir.xasw`"),
+               bigquery.WithCoder(bigquery.Float64()))
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+PCollection<Double> maxTemperatures =
+    p.apply(
+        BigQueryIO.read(
+                (SchemaAndRecord elem) -> (Double) 
elem.getRecord().get("max_temperature"))
+            .fromQuery(
+                "SELECT max_temperature FROM `tess-372508.fir.xasw`")
+            .usingStandardSql()
+            .withCoder(DoubleCoder.of()));
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+lines = p | 'ReadFromBigQuery' >> 
beam.io.Read(beam.io.BigQuerySource(query='SELECT max_temperature FROM 
`tess-372508.fir.xasw`'))
+```
+{{end}}

Review Comment:
   Runnable example description\challenge



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -0,0 +1,83 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text 
files from **Google Cloud Storage** **(GCS)** in a pipeline.
+To write data to a file on **GCS**, you can use the Write method and pass in 
the **GCS** file path as a string. Here is an example of writing a string to a 
text file named "**myfile.txt**" in a **GCS** bucket named "**mybucket**":
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "gs://mybucket/myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("gs://mybucket/myfile.txt"));
+pipeline.run();
+```
+{{end}}
+
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('gs://mybucket/myfile.txt')
+p.run()
+```
+{{end}}
+
+{{if (eq .Sdk "go")}}
+It is important to note that in order to interact with GCS you will need to 
set up authentication, you can do that by setting the appropriate 
**GOOGLE_APPLICATION_CREDENTIALS** environment variable or using the 
`options.WithCredentials` method during pipeline creation.
+
+```
+options := []beam.PipelineOption{
+    beam.WithCredentials(creds),
+}
+p, err := beam.NewPipeline(options...)
+```
+Where `creds` is an instance of `google.Credentials`.
+{{end}}
+
+{{if (eq .Sdk "python")}}
+It is important to note that in order to interact with **GCS** you will need 
to set up authentication, you can do that by setting the appropriate 
**GOOGLE_APPLICATION_CREDENTIALS** environment variable or by using the 
with_options method during pipeline creation and passing gcp_project and 
`gcp_credentials` options.
+
+```
+options = PipelineOptions()
+google_cloud_options = options.view_as(GoogleCloudOptions)
+google_cloud_options.project = 'my-project-id'
+google_cloud_options.job_name = 'myjob'
+google_cloud_options.staging_location = 'gs://my-bucket/staging'
+google_cloud_options.temp_location = 'gs://my-bucket/temp'
+google_cloud_options.region = 'us-central1'
+
+# set credentials
+credentials = GoogleCredentials.get_application_default()
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+It is important to note that in order to interact with **GCS** you will need 
to set up authentication, need specify in the console as an additional parameter

Review Comment:
   Seems to be an incomplete description of how to set up GCP credentials 



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-query/description.md:
##########
@@ -0,0 +1,42 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading BigQuery query results
+
+Apache Beam's `BigQueryIO` connector allows you to read data from `BigQuery` 
tables and use it as a source for your data pipeline. The `BigQueryIO.Read()` 
method is used to read data from a `BigQuery` table based on a **SQL query**.

Review Comment:
   You can use `BigQueryIO` connector to execute **SQL query** and read its 
results. Like reading from a table, you need to use the `BigQueryIO.Read()` 
method but specify  **SQL query** instead of a table name. 
                
   > The `BigQueryIO.Read()` method reads data from a `BigQuery` table in 
parallel by automatically splitting the query into smaller pieces and running 
each piece in a separate `BigQuery` job. This can improve large tables' 
performance but also increase the cost of running your pipeline.
   



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-table/description.md:
##########
@@ -0,0 +1,59 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading BigQuery table
+
+`BigQueryIO` allows you to read from a `BigQuery` table and read the results. 
By default, Beam invokes a `BigQuery` export request when you apply a 
`BigQueryIO` read transform. In Java Beam SDK, readTableRows returns a 
`PCollection` of `BigQuery` `TableRow` objects. Each element in the 
`PCollection` represents a single row in the table.
+
+> `Integer` values in the `TableRow` objects are encoded as strings to match 
`BigQuery`’s exported JSON format. This method is convenient but has a 
performance impact. Alternatively, you can use `read(SerializableFunction)` 
method to avoid this.
+
+{{if (eq .Sdk "go")}}
+
+```
+rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, 
DatasetID: datasetID, TableID: tableID})
+beam.ParDo0(s, &logOutput{}, rows)
+```
+
+The `bigqueryio.Read()` method is called with a `bigquery.TableReference` 
object that specifies the project, dataset, and table IDs for the `BigQuery` 
table to read from.
+
+The `Read()` method returns a PCollection of `TableRow` objects, which 
represent the rows of data in the BigQuery table.

Review Comment:
   The `Read()` method returns a PCollection of `TableRow` objects, 
representing the data rows in the BigQuery table.



##########
learning/tour-of-beam/learning-content/IO/big-query-io/table-schema/description.md:
##########
@@ -0,0 +1,121 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery with table-schema
+
+{{if (eq .Sdk "java")}}
+In Apache Beam, the `BigQueryIO` package provides the ability to read from and 
write to Google `BigQuery`. To use this package, you need to define a table 
schema for your BigQuery table, which specifies the names, data types, and 
modes of the columns in the table.
+```
+type User struct {
+       ID   int32  `bigquery:"id"`
+       Name string `bigquery:"name"`
+       Age  int32  `bigquery:"age"`
+}
+
+rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, 
DatasetID: datasetID, TableID: tableID},
+               beam.WithSchema(User{}))
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+`DynamicDestinations` is a feature provided by the `BigQueryIO` class in 
Apache Beam that allows you to write data to different BigQuery tables based on 
the input elements. The feature allows you to specify a function that takes an 
input element and returns the destination table information (table name, 
schema, etc) for that element.
+
+`DynamicDestinations` interface provided by the `BigQueryIO` class in Apache 
Beam has three methods:
+
+* `getDestination`: takes an input element and returns a TableDestination 
object, which contains the information about the destination table.
+* `getTable`: It takes an input element and returns the table name as a string.
+* `getSchema`: It takes a table name and returns the schema as a TableSchema 
object.
+
+Here is an example of how you might use the `BigQueryIO.write()` method with 
DynamicDestinations to write data to different BigQuery tables based on the 
input elements:
+
+```
+weatherData.apply(
+    BigQueryIO.<WeatherData>write()
+        .to(
+            new DynamicDestinations<WeatherData, Long>() {
+              @Override
+              public Long getDestination(ValueInSingleWindow<WeatherData> 
elem) {
+                return elem.getValue().year;
+              }
+
+              @Override
+              public TableDestination getTable(Long destination) {
+                return new TableDestination(
+                    new TableReference()
+                        .setProjectId(writeProject)
+                        .setDatasetId(writeDataset)
+                        .setTableId(writeTable + "_" + destination),
+                    "Table for year " + destination);
+              }
+
+              @Override
+              public TableSchema getSchema(Long destination) {
+                return new TableSchema()
+                    .setFields(
+                        ImmutableList.of(
+                            new TableFieldSchema()
+                                .setName("year")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("month")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("day")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("maxTemp")
+                                .setType("FLOAT")
+                                .setMode("NULLABLE")));
+              }
+            })
+        .withFormatFunction(
+            (WeatherData elem) ->
+                new TableRow()
+                    .set("year", elem.year)
+                    .set("month", elem.month)
+                    .set("day", elem.day)
+                    .set("maxTemp", elem.maxTemp))
+        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
+```
+{{end}}
+
+{{if (eq .Sdk "python")}}
+You can use the dynamic destinations feature to write elements in a 
PCollection to different BigQuery tables, possibly with different schemas.
+
+The dynamic destinations feature groups your user type by a user-defined 
destination key, uses the key to compute a destination table and/or schema, and 
writes each group’s elements to the computed destination.
+
+In addition, you can also write your own types that have a mapping function to 
TableRow, and you can use side inputs in all DynamicDestinations methods.
+
+```
+fictional_characters_view = beam.pvalue.AsDict(
+    pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
+                                                  ('Obi Wan Kenobi', True)]))
+
+def table_fn(element, fictional_characters):
+  if element in fictional_characters:
+    return 'my_dataset.fictional_quotes'
+  else:
+    return 'my_dataset.real_quotes'
+
+quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
+    table_fn,
+    schema=table_schema,
+    table_side_inputs=(fictional_characters_view, ),
+    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
+    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
+```
+{{end}}

Review Comment:
   ### Playground Excercise 
   example description and challenge are missing



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from 
Apache Kafka and write data to it. It allows for the creation of Beam pipelines 
that can consume data from a Kafka topic, process the data and write the 
processed data back to another Kafka topic. This makes it possible to build 
data processing pipelines using Apache Beam that can easily integrate with a 
Kafka-based data architecture.
+
+{{if (eq .Sdk "go")}}
+```
+var (
+       expansionAddr = flag.String("expansion_addr", "",
+               "Address of Expansion Service. If not specified, attempts to 
automatically start an appropriate expansion service.")
+       bootstrapServers = flag.String("bootstrap_servers", "",
+               "(Required) URL of the bootstrap servers for the Kafka cluster. 
Should be accessible by the runner.")
+       topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic 
to write to and read from.")
+)
+
+read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic})
+```
+{{end}}

Review Comment:
   Please describe Kafka configuration



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from 
Apache Kafka and write data to it. It allows for the creation of Beam pipelines 
that can consume data from a Kafka topic, process the data and write the 
processed data back to another Kafka topic. This makes it possible to build 
data processing pipelines using Apache Beam that can easily integrate with a 
Kafka-based data architecture.

Review Comment:
   KafkaIO is a part of the Apache Beam SDK that provides a way to read data 
from Apache Kafka and write data to it. It allows for creating Beam pipelines 
that can consume data from a Kafka topic, process the data and write the 
processed data back to another Kafka topic. This makes it possible to build 
data processing pipelines using Apache Beam that can easily integrate with a 
Kafka-based data architecture.



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO

Review Comment:
   ### Reading from Kafka using KafkaIO



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from 
Apache Kafka and write data to it. It allows for the creation of Beam pipelines 
that can consume data from a Kafka topic, process the data and write the 
processed data back to another Kafka topic. This makes it possible to build 
data processing pipelines using Apache Beam that can easily integrate with a 
Kafka-based data architecture.
+
+{{if (eq .Sdk "go")}}
+```
+var (
+       expansionAddr = flag.String("expansion_addr", "",
+               "Address of Expansion Service. If not specified, attempts to 
automatically start an appropriate expansion service.")
+       bootstrapServers = flag.String("bootstrap_servers", "",
+               "(Required) URL of the bootstrap servers for the Kafka cluster. 
Should be accessible by the runner.")
+       topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic 
to write to and read from.")
+)
+
+read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic})
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+p.apply("ReadFromKafka",
+                        KafkaIO.<String, String>read()
+                                .withBootstrapServers("localhost:29092")
+                                .withTopicPartitions(
+                                        Collections.singletonList(
+                                                new TopicPartition(
+                                                        "NYCTaxi1000_simple",
+                                .withKeyDeserializer(StringDeserializer.class)
+                                
.withValueDeserializer(StringDeserializer.class)
+                                .withConsumerConfigUpdates(consumerConfig)
+                                .withMaxNumRecords(998)
+                                .withoutMetadata())
+```
+{{end}}

Review Comment:
   Please describe Kafka configuration



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-write/description.md:
##########
@@ -0,0 +1,53 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from 
Apache Kafka and write data to it. It allows for the creation of Beam pipelines 
that can consume data from a Kafka topic, process the data and write the 
processed data back to another Kafka topic. This makes it possible to build 
data processing pipelines using Apache Beam that can easily integrate with a 
Kafka-based data architecture.

Review Comment:
   This is already described in Kafka read unit, please describe how to write 
to Kafka



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to