Re: java.lang.ClassCastException: class java.lang.String cannot be cast to class...

2024-03-17 Thread Jaehyeon Kim
Thanks. It works after specifying the output type.

On Mon, 18 Mar 2024 at 01:44, XQ Hu via user  wrote:

> Here is what I did including how I setup the portable runner with Flink
>
> 1. Start the local Flink cluster
> 2. Start the Flink job server and point to that local cluster: docker run
> --net=host apache/beam_flink1.16_job_server:latest
> --flink-master=localhost:8081
> 3. I use these pipeline options in the code: options =
> PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint=
> "localhost:8099", streaming=True)
> 4. The key I think is to explicitly specify the output types for
> TestStream like this: TestStream(coder=coders.StrUtf8Coder())
> *.with_output_types(str)*
>
> These at least work for me.
>
> On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim  wrote:
>
>> Hello,
>>
>> I am trying a simple word count pipeline in a streaming environment using
>> TestStream (Python SDK). While it works with the DirectRunner, it fails on
>> the FlinkRunner with the following error. It looks like a type casting
>> issue.
>>
>> Traceback (most recent call last):
>>   File
>> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
>> line 78, in 
>> run()
>>   File
>> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
>> line 74, in run
>> p.run().wait_until_finish()
>>   File
>> "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
>> line 576, in wait_until_finish
>> raise self._runtime_exception
>> RuntimeError: Pipeline
>> BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7
>> failed in state FAILED: java.lang.ClassCastException: class
>> java.lang.String cannot be cast to class [B (java.lang.String and [B are in
>> module java.base of loader 'bootstrap')
>>
>> Can you please inform me how to fix it? Below shows the pipeline code.
>>
>> Cheers,
>> Jaehyeon
>>
>> import os
>> import datetime
>> import argparse
>> import logging
>> import re
>>
>> import apache_beam as beam
>> from apache_beam.coders import coders
>> from apache_beam.transforms import window
>> from apache_beam.transforms.trigger import AfterWatermark,
>> AccumulationMode
>> from apache_beam.testing.test_stream import TestStream
>> from apache_beam.transforms.window import TimestampedValue
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import StandardOptions
>>
>>
>> def read_file(filename: str, inputpath: str):
>> with open(os.path.join(inputpath, filename), "r") as f:
>> return f.readlines()
>>
>>
>> def tokenize(element: str):
>> return re.findall(r"[A-Za-z\']+", element)
>>
>>
>> def run():
>> parser = argparse.ArgumentParser(description="Beam pipeline
>> arguments")
>> parser.add_argument(
>> "--inputs",
>> default="inputs",
>> help="Specify folder name that event records are saved",
>> )
>> parser.add_argument(
>> "--runner", default="DirectRunner", help="Specify Apache Beam
>> Runner"
>> )
>> opts = parser.parse_args()
>> # PARENT_DIR =
>> os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
>>
>> options = PipelineOptions()
>> options.view_as(StandardOptions).runner = opts.runner
>>
>> lines = [
>> "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.
>> Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla
>> non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien.
>> Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque
>> penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim
>> a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum
>> quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius
>> modi tempora incidunt ut labore et dolore magnam aliquam quaerat
>> voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non
>> proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In
>> enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora
>> torquent per conub

Re: java.lang.ClassCastException: class java.lang.String cannot be cast to class...

2024-03-17 Thread XQ Hu via user
Here is what I did including how I setup the portable runner with Flink

1. Start the local Flink cluster
2. Start the Flink job server and point to that local cluster: docker run
--net=host apache/beam_flink1.16_job_server:latest
--flink-master=localhost:8081
3. I use these pipeline options in the code: options =
PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint=
"localhost:8099", streaming=True)
4. The key I think is to explicitly specify the output types for TestStream
like this: TestStream(coder=coders.StrUtf8Coder())*.with_output_types(str)*

These at least work for me.

On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim  wrote:

> Hello,
>
> I am trying a simple word count pipeline in a streaming environment using
> TestStream (Python SDK). While it works with the DirectRunner, it fails on
> the FlinkRunner with the following error. It looks like a type casting
> issue.
>
> Traceback (most recent call last):
>   File
> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
> line 78, in 
> run()
>   File
> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
> line 74, in run
> p.run().wait_until_finish()
>   File
> "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 576, in wait_until_finish
> raise self._runtime_exception
> RuntimeError: Pipeline
> BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7
> failed in state FAILED: java.lang.ClassCastException: class
> java.lang.String cannot be cast to class [B (java.lang.String and [B are in
> module java.base of loader 'bootstrap')
>
> Can you please inform me how to fix it? Below shows the pipeline code.
>
> Cheers,
> Jaehyeon
>
> import os
> import datetime
> import argparse
> import logging
> import re
>
> import apache_beam as beam
> from apache_beam.coders import coders
> from apache_beam.transforms import window
> from apache_beam.transforms.trigger import AfterWatermark,
> AccumulationMode
> from apache_beam.testing.test_stream import TestStream
> from apache_beam.transforms.window import TimestampedValue
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import StandardOptions
>
>
> def read_file(filename: str, inputpath: str):
> with open(os.path.join(inputpath, filename), "r") as f:
> return f.readlines()
>
>
> def tokenize(element: str):
> return re.findall(r"[A-Za-z\']+", element)
>
>
> def run():
> parser = argparse.ArgumentParser(description="Beam pipeline arguments"
> )
> parser.add_argument(
> "--inputs",
> default="inputs",
> help="Specify folder name that event records are saved",
> )
> parser.add_argument(
> "--runner", default="DirectRunner", help="Specify Apache Beam
> Runner"
> )
> opts = parser.parse_args()
> # PARENT_DIR =
> os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
>
> options = PipelineOptions()
> options.view_as(StandardOptions).runner = opts.runner
>
> lines = [
> "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.
> Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla
> non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien.
> Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque
> penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim
> a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum
> quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius
> modi tempora incidunt ut labore et dolore magnam aliquam quaerat
> voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non
> proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In
> enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora
> torquent per conubia nostra, per inceptos hymenaeos."
> "Duis pulvinar. Integer pellentesque quam vel velit. Sed
> convallis magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque
> porta. Maecenas fermentum, sem in pharetra pellentesque, velit turpis
> volutpat ante, in pharetra metus odio a lectus. Fusce suscipit libero eget
> elit. Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla.
> Etiam dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam
> ornare wisi eu metus. Aenean vel massa

java.lang.ClassCastException: class java.lang.String cannot be cast to class...

2024-03-14 Thread Jaehyeon Kim
Hello,

I am trying a simple word count pipeline in a streaming environment using
TestStream (Python SDK). While it works with the DirectRunner, it fails on
the FlinkRunner with the following error. It looks like a type casting
issue.

Traceback (most recent call last):
  File
"/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
line 78, in 
run()
  File
"/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
line 74, in run
p.run().wait_until_finish()
  File
"/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
line 576, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline
BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7
failed in state FAILED: java.lang.ClassCastException: class
java.lang.String cannot be cast to class [B (java.lang.String and [B are in
module java.base of loader 'bootstrap')

Can you please inform me how to fix it? Below shows the pipeline code.

Cheers,
Jaehyeon

import os
import datetime
import argparse
import logging
import re

import apache_beam as beam
from apache_beam.coders import coders
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions


def read_file(filename: str, inputpath: str):
with open(os.path.join(inputpath, filename), "r") as f:
return f.readlines()


def tokenize(element: str):
return re.findall(r"[A-Za-z\']+", element)


def run():
parser = argparse.ArgumentParser(description="Beam pipeline arguments")
parser.add_argument(
"--inputs",
default="inputs",
help="Specify folder name that event records are saved",
)
parser.add_argument(
"--runner", default="DirectRunner", help="Specify Apache Beam
Runner"
)
opts = parser.parse_args()
# PARENT_DIR =
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))

options = PipelineOptions()
options.view_as(StandardOptions).runner = opts.runner

lines = [
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit.
Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla
non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien.
Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque
penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim
a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum
quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius
modi tempora incidunt ut labore et dolore magnam aliquam quaerat
voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non
proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In
enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora
torquent per conubia nostra, per inceptos hymenaeos."
"Duis pulvinar. Integer pellentesque quam vel velit. Sed convallis
magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque porta.
Maecenas fermentum, sem in pharetra pellentesque, velit turpis volutpat
ante, in pharetra metus odio a lectus. Fusce suscipit libero eget elit.
Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla. Etiam
dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam ornare
wisi eu metus. Aenean vel massa quis mauris vehicula lacinia. Nam libero
tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus
id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis
dolor repellendus."
]
# lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
now = int(datetime.datetime.now().timestamp() * 1000)
test_stream = (
TestStream(coder=coders.StrUtf8Coder())
.add_elements(
[TimestampedValue(lines[i], now + 1000) for i in range(len(lines
))]
)
.advance_watermark_to_infinity()
)

p = beam.Pipeline(options=options)
(
p
| "Read stream" >> test_stream
| "Extract words" >> beam.FlatMap(tokenize)
| "Windowing"
>> beam.WindowInto(
window.GlobalWindows(),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING,
)
| "Count per word" >> beam.combiners.Count.PerElement()
| beam.Map(print)
)

logging.getLogger().setLevel(logging.INFO)
logging.info("Building pipeline ...")

p.run().wait_until_finish()


if __name__ == "__main__":
run()