Re: A problem with ZetaSQL

2021-03-04 Thread Brian Hulette
Ah, I suspect this is because our ZetaSQL planner only supports 64 bit
integers (see
https://beam.apache.org/documentation/dsls/sql/zetasql/data-types/#integer-type
). +Robin Qiu  maybe we should have a better error
message for this?

On Thu, Mar 4, 2021 at 5:24 PM Tao Li  wrote:

> Brian the schema is really simple. Just 3 primitive type columns:
>
>
>
> root
>
> |-- column_1: integer (nullable = true)
>
> |-- column_2: integer (nullable = true)
>
> |-- column_3: string (nullable = true)
>
>
>
>
>
> *From: *Brian Hulette 
> *Date: *Thursday, March 4, 2021 at 2:29 PM
> *To: *Tao Li 
> *Cc: *"user@beam.apache.org" 
> *Subject: *Re: A problem with ZetaSQL
>
>
>
> Thanks, It would also be helpful to know what avroSchema is, or at least
> the types of its fields, so we can understand what the schema of the
> PCollection is.
>
>
>
> On Tue, Mar 2, 2021 at 11:00 AM Tao Li  wrote:
>
> Hi Brian,
>
>
>
> Here is my code to create the PCollection.
>
>
>
> PCollection files = pipeline
>
> .apply(FileIO.match().filepattern(path))
>
> .apply(FileIO.readMatches());
>
>
>
> PCollection input =  files
>
> .apply(ParquetIO.readFiles(avroSchema))
>
> .apply(MapElements
>
> .into(TypeDescriptors.rows())
>
>
> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema
>
> .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
>
>
>
>
>
> *From: *Brian Hulette 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, March 2, 2021 at 10:31 AM
> *To: *user 
> *Subject: *Re: A problem with ZetaSQL
>
>
>
> Thanks for reporting this Tao - could you share what the type of your
> input PCollection is?
>
>
>
> On Tue, Mar 2, 2021 at 9:33 AM Tao Li  wrote:
>
> Hi all,
>
>
>
> I was following the instructions from this doc to play with ZetaSQL
> https://beam.apache.org/documentation/dsls/sql/overview/
> 
>
>
>
> The query is really simple:
>
>
>
> options.as
> 
> (BeamSqlPipelineOptions.class).setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")
>
> input.apply(SqlTransform.query("SELECT * from PCOLLECTION"))
>
>
>
> I am seeing this error with ZetaSQL  :
>
>
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Unknown Calcite type: INTEGER
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlCalciteTranslationUtils.toZetaSqlType(ZetaSqlCalciteTranslationUtils.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addFieldsToTable(SqlAnalyzer.java:359)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addTableToLeafCatalog(SqlAnalyzer.java:350)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.lambda$createPopulatedCatalog$1(SqlAnalyzer.java:225)
>
> at
> com.google.common.collect.ImmutableList.forEach(ImmutableList.java:406)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.createPopulatedCatalog(SqlAnalyzer.java:225)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:102)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
>
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:140)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
>
>
>
> This query works fine when using Calcite (by just removing setPlannerName
> call). Am I missing anything here? For example I am specifying
> 'com.google.guava:guava:23.0' as the dependency.
>
>
>
> Thanks!
>
>
>
>
>
>


Re: A problem with ZetaSQL

2021-03-04 Thread Tao Li
Brian the schema is really simple. Just 3 primitive type columns:

root
|-- column_1: integer (nullable = true)
|-- column_2: integer (nullable = true)
|-- column_3: string (nullable = true)


From: Brian Hulette 
Date: Thursday, March 4, 2021 at 2:29 PM
To: Tao Li 
Cc: "user@beam.apache.org" 
Subject: Re: A problem with ZetaSQL

Thanks, It would also be helpful to know what avroSchema is, or at least the 
types of its fields, so we can understand what the schema of the PCollection is.

On Tue, Mar 2, 2021 at 11:00 AM Tao Li 
mailto:t...@zillow.com>> wrote:
Hi Brian,

Here is my code to create the PCollection.

PCollection files = pipeline
.apply(FileIO.match().filepattern(path))
.apply(FileIO.readMatches());

PCollection input =  files
.apply(ParquetIO.readFiles(avroSchema))
.apply(MapElements
.into(TypeDescriptors.rows())

.via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema
.setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));


From: Brian Hulette mailto:bhule...@google.com>>
Reply-To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Date: Tuesday, March 2, 2021 at 10:31 AM
To: user mailto:user@beam.apache.org>>
Subject: Re: A problem with ZetaSQL

Thanks for reporting this Tao - could you share what the type of your input 
PCollection is?

On Tue, Mar 2, 2021 at 9:33 AM Tao Li mailto:t...@zillow.com>> 
wrote:
Hi all,

I was following the instructions from this doc to play with ZetaSQL  
https://beam.apache.org/documentation/dsls/sql/overview/

The query is really simple:

options.as(BeamSqlPipelineOptions.class).setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")
input.apply(SqlTransform.query("SELECT * from PCOLLECTION"))

I am seeing this error with ZetaSQL  :

Exception in thread "main" java.lang.UnsupportedOperationException: Unknown 
Calcite type: INTEGER
at 
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlCalciteTranslationUtils.toZetaSqlType(ZetaSqlCalciteTranslationUtils.java:114)
at 
org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addFieldsToTable(SqlAnalyzer.java:359)
at 
org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addTableToLeafCatalog(SqlAnalyzer.java:350)
at 
org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.lambda$createPopulatedCatalog$1(SqlAnalyzer.java:225)
at 
com.google.common.collect.ImmutableList.forEach(ImmutableList.java:406)
at 
org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.createPopulatedCatalog(SqlAnalyzer.java:225)
at 
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:102)
at 
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
at 
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
at 
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
at 
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:140)
at 
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)

This query works fine when using Calcite (by just removing setPlannerName 
call). Am I missing anything here? For example I am specifying 
'com.google.guava:guava:23.0' as the dependency.

Thanks!




Re: A problem with ZetaSQL

2021-03-04 Thread Brian Hulette
Thanks, It would also be helpful to know what avroSchema is, or at least
the types of its fields, so we can understand what the schema of the
PCollection is.

On Tue, Mar 2, 2021 at 11:00 AM Tao Li  wrote:

> Hi Brian,
>
>
>
> Here is my code to create the PCollection.
>
>
>
> PCollection files = pipeline
>
> .apply(FileIO.match().filepattern(path))
>
> .apply(FileIO.readMatches());
>
>
>
> PCollection input =  files
>
> .apply(ParquetIO.readFiles(avroSchema))
>
> .apply(MapElements
>
> .into(TypeDescriptors.rows())
>
>
> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema
>
> .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
>
>
>
>
>
> *From: *Brian Hulette 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, March 2, 2021 at 10:31 AM
> *To: *user 
> *Subject: *Re: A problem with ZetaSQL
>
>
>
> Thanks for reporting this Tao - could you share what the type of your
> input PCollection is?
>
>
>
> On Tue, Mar 2, 2021 at 9:33 AM Tao Li  wrote:
>
> Hi all,
>
>
>
> I was following the instructions from this doc to play with ZetaSQL
> https://beam.apache.org/documentation/dsls/sql/overview/
> 
>
>
>
> The query is really simple:
>
>
>
> options.as
> 
> (BeamSqlPipelineOptions.class).setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")
>
> input.apply(SqlTransform.query("SELECT * from PCOLLECTION"))
>
>
>
> I am seeing this error with ZetaSQL  :
>
>
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Unknown Calcite type: INTEGER
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlCalciteTranslationUtils.toZetaSqlType(ZetaSqlCalciteTranslationUtils.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addFieldsToTable(SqlAnalyzer.java:359)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addTableToLeafCatalog(SqlAnalyzer.java:350)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.lambda$createPopulatedCatalog$1(SqlAnalyzer.java:225)
>
> at
> com.google.common.collect.ImmutableList.forEach(ImmutableList.java:406)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.createPopulatedCatalog(SqlAnalyzer.java:225)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:102)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
>
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:140)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
>
>
>
> This query works fine when using Calcite (by just removing setPlannerName
> call). Am I missing anything here? For example I am specifying
> 'com.google.guava:guava:23.0' as the dependency.
>
>
>
> Thanks!
>
>
>
>
>
>


Re: Does writeDynamic() support writing different element groups to different output paths?

2021-03-04 Thread Tao Li
I was able to resolve “unable to deserialize FileBasedSink” error by adding 
withNumShards().

inputData.apply(FileIO.writeDynamic()
.by(record -> "test")
.withDestinationCoder(StringUtf8Coder.of())
.via(ParquetIO.sink(inputAvroSchema))
.to(outputPath)
.withNumShards(10)
.withNaming(new SimpleFunction() {
@Override
public FileIO.Write.FileNaming apply(String input) {
return  FileIO.Write.relativeFileNaming(
ValueProvider.StaticValueProvider.of(outputPath 
+ "/" + input), naming);
}
}));

Now I am seeing a new error as below. Is this related to 
https://issues.apache.org/jira/browse/BEAM-9868? I don’t quite understand what 
this error means. Please advise.

Exception in thread "main" java.lang.IllegalArgumentException: unable to 
deserialize Custom DoFn With Execution Info
at 
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at 
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:709)
at 
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:392)
at 
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:377)
at 
org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:87)

From: Tao Li 
Reply-To: "user@beam.apache.org" 
Date: Thursday, March 4, 2021 at 11:52 AM
To: "user@beam.apache.org" , Kobe Feng 

Cc: Yuchu Cao 
Subject: Re: Does writeDynamic() support writing different element groups to 
different output paths?

I tried below code:

inputData.apply(FileIO.writeDynamic()
.by(record -> "test")
.via(ParquetIO.sink(inputAvroSchema))
.to(outputPath)
.withNaming(new SimpleFunction() {
@Override
public FileIO.Write.FileNaming apply(String input) {
return  FileIO.Write.relativeFileNaming(
ValueProvider.StaticValueProvider.of(outputPath 
+ "/" + input), naming);
}
})
.withDestinationCoder(StringUtf8Coder.of()));

Exception in thread "main" java.lang.IllegalArgumentException: unable to 
deserialize FileBasedSink
at 
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at 
org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.sinkFromProto(WriteFilesTranslation.java:125)
at 
org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.getSink(WriteFilesTranslation.java:137)
at 
org.apache.beam.runners.direct.WriteWithShardingFactory.getReplacementTransform(WriteWithShardingFactory.java:69)
at 
org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:564)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:299)

When I switch to use write() API as below, it works fine. Does anyone have any 
ideas? Thanks!

inputData.apply(FileIO.write()
.withNumShards(10)
.via(ParquetIO.sink(inputAvroSchema))
.to(outputPath)
.withSuffix(".parquet"));


From: Tao Li 
Reply-To: "user@beam.apache.org" 
Date: Thursday, March 4, 2021 at 9:36 AM
To: "user@beam.apache.org" , Kobe Feng 

Cc: Yuchu Cao 
Subject: Re: Does writeDynamic() support writing different element groups to 
different output paths?

Thanks Kobe let me give it a try!

From: Kobe Feng 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, March 3, 2021 at 9:33 PM
To: "user@beam.apache.org" 
Cc: Yuchu Cao 
Subject: Re: Does writeDynamic() support writing different element groups to 
different output paths?

I used the following way long time ago for writing into partitions in hdfs 
(maybe better solutions from others), and not sure any interface change which 
you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix, 
FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) => 
relativeFileNaming(StaticValueProvider.of[String](baseDir + Path.SEPARATOR + 
partitionFolder), fileNaming))
   ...

val partitionFunc: T => String



the good practice is auto-switch: using event time field from record value for 
partitioning when event time window, or process time.

and part

Re: Does writeDynamic() support writing different element groups to different output paths?

2021-03-04 Thread Tao Li
I tried below code:

inputData.apply(FileIO.writeDynamic()
.by(record -> "test")
.via(ParquetIO.sink(inputAvroSchema))
.to(outputPath)
.withNaming(new SimpleFunction() {
@Override
public FileIO.Write.FileNaming apply(String input) {
return  FileIO.Write.relativeFileNaming(
ValueProvider.StaticValueProvider.of(outputPath 
+ "/" + input), naming);
}
})
.withDestinationCoder(StringUtf8Coder.of()));

Exception in thread "main" java.lang.IllegalArgumentException: unable to 
deserialize FileBasedSink
at 
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at 
org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.sinkFromProto(WriteFilesTranslation.java:125)
at 
org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.getSink(WriteFilesTranslation.java:137)
at 
org.apache.beam.runners.direct.WriteWithShardingFactory.getReplacementTransform(WriteWithShardingFactory.java:69)
at 
org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:564)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:299)

When I switch to use write() API as below, it works fine. Does anyone have any 
ideas? Thanks!

inputData.apply(FileIO.write()
.withNumShards(10)
.via(ParquetIO.sink(inputAvroSchema))
.to(outputPath)
.withSuffix(".parquet"));


From: Tao Li 
Reply-To: "user@beam.apache.org" 
Date: Thursday, March 4, 2021 at 9:36 AM
To: "user@beam.apache.org" , Kobe Feng 

Cc: Yuchu Cao 
Subject: Re: Does writeDynamic() support writing different element groups to 
different output paths?

Thanks Kobe let me give it a try!

From: Kobe Feng 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, March 3, 2021 at 9:33 PM
To: "user@beam.apache.org" 
Cc: Yuchu Cao 
Subject: Re: Does writeDynamic() support writing different element groups to 
different output paths?

I used the following way long time ago for writing into partitions in hdfs 
(maybe better solutions from others), and not sure any interface change which 
you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix, 
FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) => 
relativeFileNaming(StaticValueProvider.of[String](baseDir + Path.SEPARATOR + 
partitionFolder), fileNaming))
   ...

val partitionFunc: T => String



the good practice is auto-switch: using event time field from record value for 
partitioning when event time window, or process time.

and partitionFunc could consider multi partition columns to get subdirectories 
base on ur file system path separator, e.g. S3.

On Wed, Mar 3, 2021 at 5:36 PM Tao Li mailto:t...@zillow.com>> 
wrote:
Hi Beam community,

I have a streaming app that writes every hour’s data to a folder named with 
this hour. With Flink (for example), we can leverage “Bucketing File Sink”: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html

However I am not seeing Beam FileIO’s writeDynamic API supports specifying 
different output paths for different groups: 
https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html

Seems like writeDynamic() only supports specifying different naming strategy.

How can I specify different hourly based output paths for hourly data with Beam 
writeDynamic? Please advise. Thanks!




--
Yours Sincerely
Kobe Feng


Re: using python sdk+kafka under k8s

2021-03-04 Thread Kyle Weaver
The problem is that Kafka is a "cross-language" transform that is
implemented in Java. You have  configured your Python pipeline to run with
environment_type=EXTERNAL. However the Kafka transform has its own
environment that has environment_type=DOCKER, it does not respect the
environment_type you set for the pipeline. Currently I don't think there's
a way to configure the environment for an external transform; I brought up
this issue in a recent thread [1]. The reason for the error you are seeing
is that environment_type=DOCKER tries to start up Docker inside your Flink
workers, which must not have Docker installed.

[1]
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E

On Thu, Mar 4, 2021 at 2:28 AM yilun zhang  wrote:

> hmmm, looks like I may fail due to docker environment:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/runpy.py", line 193, in
> _run_module_as_main
> "__main__", mod_spec)
>   File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
> exec(code, run_globals)
>   File "/tmp/kafka_test.py", line 26, in 
> run_pipeline()
>   File "/tmp/kafka_test.py", line 22, in run_pipeline
> |beam.Map(print)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 583, in __exit__
> self.result.wait_until_finish()
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 581, in wait_until_finish
> raise self._runtime_exception
> RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
> state FAILED:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
> at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
> at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.

Re: Does writeDynamic() support writing different element groups to different output paths?

2021-03-04 Thread Tao Li
Thanks Kobe let me give it a try!

From: Kobe Feng 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, March 3, 2021 at 9:33 PM
To: "user@beam.apache.org" 
Cc: Yuchu Cao 
Subject: Re: Does writeDynamic() support writing different element groups to 
different output paths?

I used the following way long time ago for writing into partitions in hdfs 
(maybe better solutions from others), and not sure any interface change which 
you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix, 
FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) => 
relativeFileNaming(StaticValueProvider.of[String](baseDir + Path.SEPARATOR + 
partitionFolder), fileNaming))
   ...

val partitionFunc: T => String



the good practice is auto-switch: using event time field from record value for 
partitioning when event time window, or process time.

and partitionFunc could consider multi partition columns to get subdirectories 
base on ur file system path separator, e.g. S3.

On Wed, Mar 3, 2021 at 5:36 PM Tao Li mailto:t...@zillow.com>> 
wrote:
Hi Beam community,

I have a streaming app that writes every hour’s data to a folder named with 
this hour. With Flink (for example), we can leverage “Bucketing File Sink”: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html

However I am not seeing Beam FileIO’s writeDynamic API supports specifying 
different output paths for different groups: 
https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html

Seems like writeDynamic() only supports specifying different naming strategy.

How can I specify different hourly based output paths for hourly data with Beam 
writeDynamic? Please advise. Thanks!




--
Yours Sincerely
Kobe Feng


Re: using python sdk+kafka under k8s

2021-03-04 Thread yilun zhang
hmmm, looks like I may fail due to docker environment:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
  File "/tmp/kafka_test.py", line 26, in 
run_pipeline()
  File "/tmp/kafka_test.py", line 22, in run_pipeline
|beam.Map(print)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 583, in __exit__
self.result.wait_until_finish()
  File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 581, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
state FAILED:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=2, No such file or
directory
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "docker": error=2, No
such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
... 14 more
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 28 more


I tried to create yaml like to mount docker in local like:
apiVersion: v1
kind: Pod
metadata:
  name: beam-pod
  namespace: default
spec:
  volumes:
- name: docker-sock
  hostPath:
path: "/var/run/docker.sock"
type: Socket
- name: docke

Re: using python sdk+kafka under k8s

2021-03-04 Thread yilun zhang
We create a custom docker image, which include java runtime, python and
docker environment to run our job. But encountered timeout exception:

root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
--runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
--flink_submit_uber_jar --environment_type=EXTERNAL
--environment_config=localhost:5
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
TaskManager with id 10.190.29.80:6122-88ce88  timed out.
at
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Our test code is super simple:


import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.io import WriteToText

def run_pipeline():
  with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
flink_master="beam-flink-cluster-jobmanager:8081",
environment_type="EXTERNAL",
environment_config="localhost:5")) as p:
(p
 | 'Read from Kafka' >>
ReadFromKafka(consumer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090',

 'auto.offset.reset': 'latest'},
  topics=['test001'])
 | 'Par with 1' >> beam.Map(lambda word: (word, 1))
 | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
 | 'Group by key' >> beam.GroupByKey()
 | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
   #  | "Write to Kafka" >>
WriteToKafka(producer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090'}, topic='test001')
 | 'Write to text' >> WriteToText("/tmp/output2")
)

if __name__ == '__main__':
  run_pipeline()


Is there any suggestion on debugging direction? In flink UI, it looks like
it failed from first step, ReadFromKafka.

Thanks,
Yilun

On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver  wrote:

> In Beam, the Kafka connector does not know anything about the underlying
> execution engine (here Flink). It is instead translated by the runner into
> a user defined function in Flink. So it is expected that the resulting DAG
> does not look the same as it would with a native Flink source.
>
> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang  wrote:
>
>> So sorry for subscribing errors on my side resulted in multiple duplicate
>> email!
>>
>> Thanks for reply and it does help!
>>
>> I am confused when submitting beam job with kafka connector to flink, I
>> noticed that flink DAG diagram will included readFromKafka as part of flink
>> workflow. while if we submit a pyflink job(connected with kafka) directly
>> to flink, the flink workflow will exclude reading from kafka(which is the
>> resource) but only has data processing parts.
>>
>> Is that how beam want flink to do?
>>
>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>
>> Yilun
>>
>> Sam Bourne 于2021年2月25日 周四上午11:58写道:
>>
>>> Hi Yilun!
>>>
>>> I made a quick proof of concept repo showcasing how