[jira] [Commented] (BEAM-3788) Implement a Kafka IO for Python SDK

2020-03-15 Thread Nicolae Rosia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059941#comment-17059941
 ] 

Nicolae Rosia commented on BEAM-3788:
-

until this is fixed, is there a way to implement an unbounded source in Python? 
If yes, then I could work around this by using a Kafka library in Python, such 
as [https://pypi.org/project/kafka-python/]

> Implement a Kafka IO for Python SDK
> ---
>
> Key: BEAM-3788
> URL: https://issues.apache.org/jira/browse/BEAM-3788
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>
> Java KafkaIO will be made available to Python users as a cross-language 
> transform.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors

2020-03-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9468?focusedWorklogId=403652=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403652
 ]

ASF GitHub Bot logged work on BEAM-9468:


Author: ASF GitHub Bot
Created on: 15/Mar/20 19:27
Start Date: 15/Mar/20 19:27
Worklog Time Spent: 10m 
  Work Description: jaketf commented on pull request #11107: [BEAM-9468] 
[WIP] add HL7v2IO and FhirIO
URL: https://github.com/apache/beam/pull/11107#discussion_r392636894
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.google.api.services.healthcare.v1alpha2.model.HttpBody;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * {@link FhirIO} provides an API for writing resources to https://cloud.google.com/healthcare/docs/concepts/fhir;>Google Cloud 
Healthcare Fhir API.
+ * 
+ */
+public class FhirIO {
+
+  /** The type Write. */
+  @AutoValue
+  public abstract static class Write extends PTransform, 
PDone> {
+
+/** The enum Write method. */
+public enum WriteMethod {
+  /**
+   * Execute Bundle Method executes a batch of requests as a single 
transaction @see https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle>.
+   */
+  EXECUTE_BUNDLE,
+  /**
+   * Create Method creates a single FHIR resource @see https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create>.
+   */
+  CREATE
+}
+
+/**
+ * Gets Fhir store.
+ *
+ * @return the Fhir store
+ */
+abstract String getFhirStore();
+
+/**
+ * Gets write method.
+ *
+ * @return the write method
+ */
+abstract WriteMethod getWriteMethod();
+
+/** The type Builder. */
+@AutoValue.Builder
+abstract static class Builder {
+
+  /**
+   * Sets Fhir store.
+   *
+   * @param fhirStore the Fhir store
+   * @return the Fhir store
+   */
+  abstract Builder setFhirStore(String fhirStore);
+
+  /**
+   * Sets write method.
+   *
+   * @param writeMethod the write method
+   * @return the write method
+   */
+  abstract Builder setWriteMethod(WriteMethod writeMethod);
+
+  /**
+   * Build write.
+   *
+   * @return the write
+   */
+  abstract Write build();
+}
+
+private static Write.Builder write(String fhirStore) {
+  return new AutoValue_FhirIO_Write.Builder().setFhirStore(fhirStore);
+}
+
+/**
+ * Create Method creates a single FHIR resource. @see https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create>
+ *
+ * @param fhirStore the hl 7 v 2 store
+ * @return the write
+ */
+public static Write create(String fhirStore) {
+  return new AutoValue_FhirIO_Write.Builder()
+  .setFhirStore(fhirStore)
+  .setWriteMethod(Write.WriteMethod.CREATE)
+  .build();
+}
+
+/**
+ * Execute Bundle Method executes a batch of requests as a single 
transaction @see https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle>.
+ *
+ * @param fhirStore the hl 7 v 2 store
+ * @return the write
+ */
+public static Write executeBundles(String fhirStore) {
+  return new AutoValue_FhirIO_Write.Builder()
+  .setFhirStore(fhirStore)
+  .setWriteMethod(WriteMethod.EXECUTE_BUNDLE)
+  .build();
+}
+
+@Override
+public PDone expand(PCollection messages) {
+  messages.apply(ParDo.of(new 

[jira] [Work logged] (BEAM-9035) BIP-1: Typed options for Row Schema and Fields

2020-03-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9035?focusedWorklogId=403646=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403646
 ]

ASF GitHub Bot logged work on BEAM-9035:


Author: ASF GitHub Bot
Created on: 15/Mar/20 18:04
Start Date: 15/Mar/20 18:04
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #10413: [BEAM-9035] Typed 
options for Row Schema and Field
URL: https://github.com/apache/beam/pull/10413#issuecomment-599244074
 
 
   Taking another look. I'm fine with metadata being in a separate PR - it
   makes it easier to review. I just wanted to ensure that metadata was still
   in the plan.
   
   On Sun, Mar 15, 2020 at 4:24 AM Alex Van Boxel 
   wrote:
   
   > I think I addresses now all the issues in the latest fixup.
   >
   > Talking about field metadata: yes this will be another PR. I didn't want
   > this all in one PR. This is the groundwork to get a decent API in. I have
   > at least 2 related PR's (proto and avro support) waiting before tackling
   > removal of metadata though.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   >
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 403646)
Time Spent: 8h  (was: 7h 50m)

> BIP-1: Typed options for Row Schema and Fields
> --
>
> Key: BEAM-9035
> URL: https://issues.apache.org/jira/browse/BEAM-9035
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Alex Van Boxel
>Assignee: Alex Van Boxel
>Priority: Major
> Fix For: 2.19.0
>
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> This is the first issue of a multipart commit: this ticket implements the 
> basic infrastructure of options on row and field.
> Full explanation:
> Introduce the concept of Options in Beam Schema’s to add extra context to 
> fields and schema. In contracts to metadata, options would be added to 
> fields, logical types and rows. In the options schema convertors can add 
> options/annotations/decorators that were in the original schema, this context 
> can be used in the rest of the pipeline for specific transformations or 
> augment the end schema in the target output.
> Examples of options are:
>  * informational: like the source of the data, ...
>  * drive decisions further in the pipeline: flatten a row into another, 
> rename a field, ...
>  * influence something in the output: like cluster index, primary key, ...
>  * logical type information
> And option is a key/typed value combination. The advantages of having the 
> value types is: 
>  * Having strongly typed options would give a *portable way of Logical Types* 
> to have structured information that could be shared over different languages.
>  * This could keep the type intact when mapping from a formats that have 
> strongly typed options (example: Protobuf).
> This is part of a multi ticket implementation. The following tickets are 
> related:
>  # Typed options for Row Schema and Fields
>  # Convert Proto Options to Beam Schema options
>  # Convert Avro extra information for Beam string options
>  # Replace meta data with Logical Type options
>  # Extract meta data in Calcite SQL to Beam options
>  # Extract meta data in Zeta SQL to Beam options
>  # Add java example of using option in a transform 
> This feature is discussed with Reuven Lax, Brian Hulette



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-5265) Can not test Timer with processing time domain

2020-03-15 Thread Jozef Vilcek (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek reassigned BEAM-5265:
--

Assignee: (was: Jozef Vilcek)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9035) BIP-1: Typed options for Row Schema and Fields

2020-03-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9035?focusedWorklogId=403574=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403574
 ]

ASF GitHub Bot logged work on BEAM-9035:


Author: ASF GitHub Bot
Created on: 15/Mar/20 11:24
Start Date: 15/Mar/20 11:24
Worklog Time Spent: 10m 
  Work Description: alexvanboxel commented on issue #10413: [BEAM-9035] 
Typed options for Row Schema and Field
URL: https://github.com/apache/beam/pull/10413#issuecomment-599197547
 
 
   I think I addresses now all the issues in the latest fixup. 
   
   Talking about field metadata: yes this will be another PR. I didn't want 
this all in one PR. This is the groundwork to get a decent API in. I have at 
least 2 related PR's (proto and avro support) waiting before tackling removal 
of metadata though.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 403574)
Time Spent: 7h 50m  (was: 7h 40m)

> BIP-1: Typed options for Row Schema and Fields
> --
>
> Key: BEAM-9035
> URL: https://issues.apache.org/jira/browse/BEAM-9035
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Alex Van Boxel
>Assignee: Alex Van Boxel
>Priority: Major
> Fix For: 2.19.0
>
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> This is the first issue of a multipart commit: this ticket implements the 
> basic infrastructure of options on row and field.
> Full explanation:
> Introduce the concept of Options in Beam Schema’s to add extra context to 
> fields and schema. In contracts to metadata, options would be added to 
> fields, logical types and rows. In the options schema convertors can add 
> options/annotations/decorators that were in the original schema, this context 
> can be used in the rest of the pipeline for specific transformations or 
> augment the end schema in the target output.
> Examples of options are:
>  * informational: like the source of the data, ...
>  * drive decisions further in the pipeline: flatten a row into another, 
> rename a field, ...
>  * influence something in the output: like cluster index, primary key, ...
>  * logical type information
> And option is a key/typed value combination. The advantages of having the 
> value types is: 
>  * Having strongly typed options would give a *portable way of Logical Types* 
> to have structured information that could be shared over different languages.
>  * This could keep the type intact when mapping from a formats that have 
> strongly typed options (example: Protobuf).
> This is part of a multi ticket implementation. The following tickets are 
> related:
>  # Typed options for Row Schema and Fields
>  # Convert Proto Options to Beam Schema options
>  # Convert Avro extra information for Beam string options
>  # Replace meta data with Logical Type options
>  # Extract meta data in Calcite SQL to Beam options
>  # Extract meta data in Zeta SQL to Beam options
>  # Add java example of using option in a transform 
> This feature is discussed with Reuven Lax, Brian Hulette



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-03-15 Thread Nicolae Rosia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059596#comment-17059596
 ] 

Nicolae Rosia commented on BEAM-9046:
-

Hello, I have the same issue with either of these versions:
 * 2.19.0
 * 2.20.0
 * master

+ Flink 1.9 + Kafka

Is there a known working example from which I could start?

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -
>
> Key: BEAM-9046
> URL: https://issues.apache.org/jira/browse/BEAM-9046
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-kafka
>Affects Versions: 2.16.0
>Reporter: Berkay Öztürk
>Priority: Major
>  Labels: KafkaIO, Python
> Fix For: Not applicable
>
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK 
> for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 
> 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 
> https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow 
> -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
> with Pipeline(options=PipelineOptions([
> '--runner=FlinkRunner',
> '--flink_version=1.8',
> '--flink_master_url=localhost:8081',
> '--environment_type=LOOPBACK',
> '--streaming'
> ])) as pipeline:
> (
> pipeline
> | 'read' >> ReadFromKafka({'bootstrap.servers': 
> 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
> )
> result = pipeline.run()
> result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation BeamApp-USER-somejob. 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: xxx)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 13 more
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> at 
> org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> at 
> org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
> at 
>