Re: java.lang.ClassCastException: class java.lang.String cannot be cast to class...
Thanks. It works after specifying the output type. On Mon, 18 Mar 2024 at 01:44, XQ Hu via user wrote: > Here is what I did including how I setup the portable runner with Flink > > 1. Start the local Flink cluster > 2. Start the Flink job server and point to that local cluster: docker run > --net=host apache/beam_flink1.16_job_server:latest > --flink-master=localhost:8081 > 3. I use these pipeline options in the code: options = > PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint= > "localhost:8099", streaming=True) > 4. The key I think is to explicitly specify the output types for > TestStream like this: TestStream(coder=coders.StrUtf8Coder()) > *.with_output_types(str)* > > These at least work for me. > > On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim wrote: > >> Hello, >> >> I am trying a simple word count pipeline in a streaming environment using >> TestStream (Python SDK). While it works with the DirectRunner, it fails on >> the FlinkRunner with the following error. It looks like a type casting >> issue. >> >> Traceback (most recent call last): >> File >> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", >> line 78, in >> run() >> File >> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", >> line 74, in run >> p.run().wait_until_finish() >> File >> "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", >> line 576, in wait_until_finish >> raise self._runtime_exception >> RuntimeError: Pipeline >> BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7 >> failed in state FAILED: java.lang.ClassCastException: class >> java.lang.String cannot be cast to class [B (java.lang.String and [B are in >> module java.base of loader 'bootstrap') >> >> Can you please inform me how to fix it? Below shows the pipeline code. >> >> Cheers, >> Jaehyeon >> >> import os >> import datetime >> import argparse >> import logging >> import re >> >> import apache_beam as beam >> from apache_beam.coders import coders >> from apache_beam.transforms import window >> from apache_beam.transforms.trigger import AfterWatermark, >> AccumulationMode >> from apache_beam.testing.test_stream import TestStream >> from apache_beam.transforms.window import TimestampedValue >> from apache_beam.options.pipeline_options import PipelineOptions >> from apache_beam.options.pipeline_options import StandardOptions >> >> >> def read_file(filename: str, inputpath: str): >> with open(os.path.join(inputpath, filename), "r") as f: >> return f.readlines() >> >> >> def tokenize(element: str): >> return re.findall(r"[A-Za-z\']+", element) >> >> >> def run(): >> parser = argparse.ArgumentParser(description="Beam pipeline >> arguments") >> parser.add_argument( >> "--inputs", >> default="inputs", >> help="Specify folder name that event records are saved", >> ) >> parser.add_argument( >> "--runner", default="DirectRunner", help="Specify Apache Beam >> Runner" >> ) >> opts = parser.parse_args() >> # PARENT_DIR = >> os.path.dirname(os.path.dirname(os.path.realpath(__file__))) >> >> options = PipelineOptions() >> options.view_as(StandardOptions).runner = opts.runner >> >> lines = [ >> "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. >> Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla >> non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien. >> Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque >> penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim >> a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum >> quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius >> modi tempora incidunt ut labore et dolore magnam aliquam quaerat >> voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non >> proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In >> enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora >> torquent per conub
Re: java.lang.ClassCastException: class java.lang.String cannot be cast to class...
Here is what I did including how I setup the portable runner with Flink 1. Start the local Flink cluster 2. Start the Flink job server and point to that local cluster: docker run --net=host apache/beam_flink1.16_job_server:latest --flink-master=localhost:8081 3. I use these pipeline options in the code: options = PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint= "localhost:8099", streaming=True) 4. The key I think is to explicitly specify the output types for TestStream like this: TestStream(coder=coders.StrUtf8Coder())*.with_output_types(str)* These at least work for me. On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim wrote: > Hello, > > I am trying a simple word count pipeline in a streaming environment using > TestStream (Python SDK). While it works with the DirectRunner, it fails on > the FlinkRunner with the following error. It looks like a type casting > issue. > > Traceback (most recent call last): > File > "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", > line 78, in > run() > File > "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", > line 74, in run > p.run().wait_until_finish() > File > "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", > line 576, in wait_until_finish > raise self._runtime_exception > RuntimeError: Pipeline > BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7 > failed in state FAILED: java.lang.ClassCastException: class > java.lang.String cannot be cast to class [B (java.lang.String and [B are in > module java.base of loader 'bootstrap') > > Can you please inform me how to fix it? Below shows the pipeline code. > > Cheers, > Jaehyeon > > import os > import datetime > import argparse > import logging > import re > > import apache_beam as beam > from apache_beam.coders import coders > from apache_beam.transforms import window > from apache_beam.transforms.trigger import AfterWatermark, > AccumulationMode > from apache_beam.testing.test_stream import TestStream > from apache_beam.transforms.window import TimestampedValue > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.options.pipeline_options import StandardOptions > > > def read_file(filename: str, inputpath: str): > with open(os.path.join(inputpath, filename), "r") as f: > return f.readlines() > > > def tokenize(element: str): > return re.findall(r"[A-Za-z\']+", element) > > > def run(): > parser = argparse.ArgumentParser(description="Beam pipeline arguments" > ) > parser.add_argument( > "--inputs", > default="inputs", > help="Specify folder name that event records are saved", > ) > parser.add_argument( > "--runner", default="DirectRunner", help="Specify Apache Beam > Runner" > ) > opts = parser.parse_args() > # PARENT_DIR = > os.path.dirname(os.path.dirname(os.path.realpath(__file__))) > > options = PipelineOptions() > options.view_as(StandardOptions).runner = opts.runner > > lines = [ > "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. > Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla > non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien. > Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque > penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim > a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum > quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius > modi tempora incidunt ut labore et dolore magnam aliquam quaerat > voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non > proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In > enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora > torquent per conubia nostra, per inceptos hymenaeos." > "Duis pulvinar. Integer pellentesque quam vel velit. Sed > convallis magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque > porta. Maecenas fermentum, sem in pharetra pellentesque, velit turpis > volutpat ante, in pharetra metus odio a lectus. Fusce suscipit libero eget > elit. Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla. > Etiam dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam > ornare wisi eu metus. Aenean vel massa
java.lang.ClassCastException: class java.lang.String cannot be cast to class...
Hello, I am trying a simple word count pipeline in a streaming environment using TestStream (Python SDK). While it works with the DirectRunner, it fails on the FlinkRunner with the following error. It looks like a type casting issue. Traceback (most recent call last): File "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", line 78, in run() File "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", line 74, in run p.run().wait_until_finish() File "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 576, in wait_until_finish raise self._runtime_exception RuntimeError: Pipeline BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7 failed in state FAILED: java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap') Can you please inform me how to fix it? Below shows the pipeline code. Cheers, Jaehyeon import os import datetime import argparse import logging import re import apache_beam as beam from apache_beam.coders import coders from apache_beam.transforms import window from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode from apache_beam.testing.test_stream import TestStream from apache_beam.transforms.window import TimestampedValue from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions def read_file(filename: str, inputpath: str): with open(os.path.join(inputpath, filename), "r") as f: return f.readlines() def tokenize(element: str): return re.findall(r"[A-Za-z\']+", element) def run(): parser = argparse.ArgumentParser(description="Beam pipeline arguments") parser.add_argument( "--inputs", default="inputs", help="Specify folder name that event records are saved", ) parser.add_argument( "--runner", default="DirectRunner", help="Specify Apache Beam Runner" ) opts = parser.parse_args() # PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) options = PipelineOptions() options.view_as(StandardOptions).runner = opts.runner lines = [ "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien. Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos hymenaeos." "Duis pulvinar. Integer pellentesque quam vel velit. Sed convallis magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque porta. Maecenas fermentum, sem in pharetra pellentesque, velit turpis volutpat ante, in pharetra metus odio a lectus. Fusce suscipit libero eget elit. Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla. Etiam dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam ornare wisi eu metus. Aenean vel massa quis mauris vehicula lacinia. Nam libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis dolor repellendus." ] # lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs")) now = int(datetime.datetime.now().timestamp() * 1000) test_stream = ( TestStream(coder=coders.StrUtf8Coder()) .add_elements( [TimestampedValue(lines[i], now + 1000) for i in range(len(lines ))] ) .advance_watermark_to_infinity() ) p = beam.Pipeline(options=options) ( p | "Read stream" >> test_stream | "Extract words" >> beam.FlatMap(tokenize) | "Windowing" >> beam.WindowInto( window.GlobalWindows(), trigger=AfterWatermark(), accumulation_mode=AccumulationMode.DISCARDING, ) | "Count per word" >> beam.combiners.Count.PerElement() | beam.Map(print) ) logging.getLogger().setLevel(logging.INFO) logging.info("Building pipeline ...") p.run().wait_until_finish() if __name__ == "__main__": run()