[jira] [Created] (BEAM-8337) Publish portable job server container images
Kyle Weaver created BEAM-8337: - Summary: Publish portable job server container images Key: BEAM-8337 URL: https://issues.apache.org/jira/browse/BEAM-8337 Project: Beam Issue Type: Improvement Components: runner-flink, runner-spark Reporter: Kyle Weaver Could be added to the release process similar to how we now publish SDK worker images. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-8336) Make dockerized job server image configurable
[ https://issues.apache.org/jira/browse/BEAM-8336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver closed BEAM-8336. - Fix Version/s: Not applicable Resolution: Duplicate > Make dockerized job server image configurable > - > > Key: BEAM-8336 > URL: https://issues.apache.org/jira/browse/BEAM-8336 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Fix For: Not applicable > > > I'd also argue that having PortableRunner bring up a Flink job server by > default is unexpected behavior, but touching that would be a breaking change. > It might be good to move some of this logic to Python's FlinkRunner class so > we can have runner-specific defaults. > [https://github.com/apache/beam/blob/c5f43342f914fc8ff367b86fb9294c38436ed3ce/sdks/python/apache_beam/runners/portability/job_server.py#L252-L254] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7274) Protobuf Beam Schema support
[ https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=321648=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321648 ] ASF GitHub Bot logged work on BEAM-7274: Author: ASF GitHub Bot Created on: 02/Oct/19 00:38 Start Date: 02/Oct/19 00:38 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #8690: [BEAM-7274] Implement the Protobuf schema provider URL: https://github.com/apache/beam/pull/8690#discussion_r330328231 ## File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaProvider.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.DynamicMessage; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Schema provider for Protobuf messages. The provider is able to handle pre compiled Message file + * without external help. For Dynamic Messages a Descriptor needs to be registered up front on a + * specific URN. + * + * It's possible to inherit this class for a specific implementation that communicates with an + * external registry that maps those URN's with Descriptors. + */ +@Experimental(Experimental.Kind.SCHEMAS) +public class ProtoSchemaProvider implements SchemaProvider { Review comment: Is the need for `withFieldValueGettersHandleCollections(true)` the only reason this can't be made to implement `GetterBasedSchemaProvider`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321648) Time Spent: 7h 50m (was: 7h 40m) > Protobuf Beam Schema support > > > Key: BEAM-7274 > URL: https://issues.apache.org/jira/browse/BEAM-7274 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Minor > Fix For: 2.17.0 > > Time Spent: 7h 50m > Remaining Estimate: 0h > > Add support for the new Beam Schema to the Protobuf extension. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7274) Protobuf Beam Schema support
[ https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=321644=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321644 ] ASF GitHub Bot logged work on BEAM-7274: Author: ASF GitHub Bot Created on: 02/Oct/19 00:38 Start Date: 02/Oct/19 00:38 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #8690: [BEAM-7274] Implement the Protobuf schema provider URL: https://github.com/apache/beam/pull/8690#discussion_r330332563 ## File path: sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaValuesTest.java ## @@ -0,0 +1,670 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTest.COMPLEX_DEFAULT_ROW; +import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTest.MESSAGE_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTest.PRIMITIVE_DEFAULT_ROW; +import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTest.REPEAT_PRIMITIVE_DEFAULT_ROW; +import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTest.WKT_MESSAGE_DEFAULT_ROW; +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.BoolValue; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.DoubleValue; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.FloatValue; +import com.google.protobuf.Int32Value; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; +import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** Collection of tests for values on Protobuf Messages and Rows. */ +@RunWith(Parameterized.class) +public class ProtoSchemaValuesTest { + + private final Message proto; + private final Row rowObject; + private SerializableFunction toRowFunction; + private SerializableFunction fromRowFunction; + + public ProtoSchemaValuesTest(String description, Message proto, Row rowObject) { +this.proto = proto; +this.rowObject = rowObject; + } + + @Parameterized.Parameters(name = "{0}") + public static Collection data() { +List data = new ArrayList<>(); +data.add( +new Object[] { + "primitive_int32", + Proto3SchemaMessages.Primitive.newBuilder().setPrimitiveInt32(Integer.MAX_VALUE).build(), + change(PRIMITIVE_DEFAULT_ROW, "primitive_int32", Integer.MAX_VALUE) +}); +data.add( +new Object[] { + "primitive_int64", + Proto3SchemaMessages.Primitive.newBuilder().setPrimitiveInt64(Long.MAX_VALUE).build(), + change(PRIMITIVE_DEFAULT_ROW, "primitive_int64", Long.MAX_VALUE) +}); +data.add( +new Object[] { + "primitive_uint32", + Proto3SchemaMessages.Primitive.newBuilder().setPrimitiveUint32(Integer.MAX_VALUE).build(), + change(PRIMITIVE_DEFAULT_ROW, "primitive_uint32", Integer.MAX_VALUE) +}); +data.add( +new Object[] { + "primitive_uint64", + Proto3SchemaMessages.Primitive.newBuilder().setPrimitiveUint64(Long.MAX_VALUE).build(), + change(PRIMITIVE_DEFAULT_ROW, "primitive_uint64", Long.MAX_VALUE) +}); +data.add( +new Object[] { + "primitive_sint32", + Proto3SchemaMessages.Primitive.newBuilder().setPrimitiveSint32(Integer.MAX_VALUE).build(), + change(PRIMITIVE_DEFAULT_ROW,
[jira] [Work logged] (BEAM-7274) Protobuf Beam Schema support
[ https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=321646=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321646 ] ASF GitHub Bot logged work on BEAM-7274: Author: ASF GitHub Bot Created on: 02/Oct/19 00:38 Start Date: 02/Oct/19 00:38 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #8690: [BEAM-7274] Implement the Protobuf schema provider URL: https://github.com/apache/beam/pull/8690#discussion_r330331146 ## File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoFieldOverlay.java ## @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.FieldValueGetter; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.joda.time.Instant; + +/** + * Protobuf ProtoFieldOverlay is the interface that each implementation needs to implement to handle + * a specific field types. + */ +@Experimental(Experimental.Kind.SCHEMAS) +public interface ProtoFieldOverlay extends FieldValueGetter { + + ValueT convertGetObject(FieldDescriptor fieldDescriptor, Object object); + + /** Convert the Row field and set it on the overlayed field of the message. */ + void set(Message.Builder object, ValueT value); + + Object convertSetObject(FieldDescriptor fieldDescriptor, Object value); + + /** Return the Beam Schema Field of this overlayed field. */ + Schema.Field getSchemaField(); + + abstract class ProtoFieldOverlayBase implements ProtoFieldOverlay { + +protected int number; + +private Schema.Field field; + +FieldDescriptor getFieldDescriptor(Message message) { + return message.getDescriptorForType().findFieldByNumber(number); +} + +FieldDescriptor getFieldDescriptor(Message.Builder message) { + return message.getDescriptorForType().findFieldByNumber(number); +} + +protected void setField(Schema.Field field) { + this.field = field; +} + +ProtoFieldOverlayBase(ProtoSchema protoSchema, FieldDescriptor fieldDescriptor) { + // this.fieldDescriptor = fieldDescriptor; + this.number = fieldDescriptor.getNumber(); +} + +@Override +public String name() { + return field.getName(); +} + +@Override +public Schema.Field getSchemaField() { + return field; +} + } + + /** Overlay for Protobuf primitive types. Primitive values are just passed through. */ + class PrimitiveOverlay extends ProtoFieldOverlayBase { +PrimitiveOverlay(ProtoSchema protoSchema, FieldDescriptor fieldDescriptor) { + // this.fieldDescriptor = fieldDescriptor; + super(protoSchema, fieldDescriptor); + setField( + Schema.Field.of( + fieldDescriptor.getName(), + ProtoSchema.convertType(fieldDescriptor.getType()) + .withMetadata(protoSchema.convertOptions(fieldDescriptor; +} + +@Override +public Object get(Message message) { + FieldDescriptor fieldDescriptor = getFieldDescriptor(message); + return convertGetObject(fieldDescriptor, message.getField(fieldDescriptor)); +} + +@Override +public Object convertGetObject(FieldDescriptor fieldDescriptor, Object object) { + return object; +} + +@Override +public void set(Message.Builder message, Object value) { + message.setField(getFieldDescriptor(message), value); +} + +@Override +public Object convertSetObject(FieldDescriptor fieldDescriptor, Object value) { + return
[jira] [Work logged] (BEAM-7274) Protobuf Beam Schema support
[ https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=321645=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321645 ] ASF GitHub Bot logged work on BEAM-7274: Author: ASF GitHub Bot Created on: 02/Oct/19 00:38 Start Date: 02/Oct/19 00:38 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #8690: [BEAM-7274] Implement the Protobuf schema provider URL: https://github.com/apache/beam/pull/8690#discussion_r330330163 ## File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchema.java ## @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.UnknownFieldSet; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Factory; +import org.apache.beam.sdk.schemas.FieldValueGetter; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** + * ProtoSchema is a top level anchor point. It makes sure it can recreate the complete schema and + * overlay with just the Message raw type or if it's a DynamicMessage with the serialised + * Descriptor. + * + * ProtoDomain is an integral part of a ProtoSchema, it it contains all the information needed to + * iterpret and reconstruct messages. + * + * + * Protobuf oneOf fields are mapped to nullable fields and flattened into the parent row. + * Protobuf primitives are mapped to it's nullable counter part. + * Protobuf maps are mapped to nullable maps, where empty maps are mapped to the null value. + * Protobuf repeatables are mapped to nullable arrays, where empty arrays are mapped to the + * null value. + * Protobuf enums are mapped to non-nullable string values. + * Enum map to their string representation Review comment: nit: Are these two lines saying the same thing? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321645) > Protobuf Beam Schema support > > > Key: BEAM-7274 > URL: https://issues.apache.org/jira/browse/BEAM-7274 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Minor > Fix For: 2.17.0 > > Time Spent: 7h 20m > Remaining Estimate: 0h > > Add support for the new Beam Schema to the Protobuf extension. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7274) Protobuf Beam Schema support
[ https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=321643=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321643 ] ASF GitHub Bot logged work on BEAM-7274: Author: ASF GitHub Bot Created on: 02/Oct/19 00:38 Start Date: 02/Oct/19 00:38 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #8690: [BEAM-7274] Implement the Protobuf schema provider URL: https://github.com/apache/beam/pull/8690#discussion_r330330429 ## File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchema.java ## @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.UnknownFieldSet; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Factory; +import org.apache.beam.sdk.schemas.FieldValueGetter; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** + * ProtoSchema is a top level anchor point. It makes sure it can recreate the complete schema and + * overlay with just the Message raw type or if it's a DynamicMessage with the serialised + * Descriptor. + * + * ProtoDomain is an integral part of a ProtoSchema, it it contains all the information needed to + * iterpret and reconstruct messages. + * + * + * Protobuf oneOf fields are mapped to nullable fields and flattened into the parent row. + * Protobuf primitives are mapped to it's nullable counter part. + * Protobuf maps are mapped to nullable maps, where empty maps are mapped to the null value. + * Protobuf repeatables are mapped to nullable arrays, where empty arrays are mapped to the + * null value. Review comment: Why not just use non-nullable maps and arrays and produce empty ones when the proto ones are empty? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321643) Time Spent: 7h 10m (was: 7h) > Protobuf Beam Schema support > > > Key: BEAM-7274 > URL: https://issues.apache.org/jira/browse/BEAM-7274 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Minor > Fix For: 2.17.0 > > Time Spent: 7h 10m > Remaining Estimate: 0h > > Add support for the new Beam Schema to the Protobuf extension. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7274) Protobuf Beam Schema support
[ https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=321647=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321647 ] ASF GitHub Bot logged work on BEAM-7274: Author: ASF GitHub Bot Created on: 02/Oct/19 00:38 Start Date: 02/Oct/19 00:38 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #8690: [BEAM-7274] Implement the Protobuf schema provider URL: https://github.com/apache/beam/pull/8690#discussion_r330302028 ## File path: sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTest.java ## @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import java.io.IOException; +import java.util.Objects; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Collection of standard tests for Protobuf Schema support. */ +@RunWith(JUnit4.class) +public class ProtoSchemaTest { + + private static final Schema PRIMITIVE_SCHEMA = + Schema.builder() + .addDoubleField("primitive_double") + .addFloatField("primitive_float") + .addInt32Field("primitive_int32") + .addInt64Field("primitive_int64") + .addInt32Field("primitive_uint32") + .addInt64Field("primitive_uint64") + .addInt32Field("primitive_sint32") + .addInt64Field("primitive_sint64") + .addInt32Field("primitive_fixed32") + .addInt64Field("primitive_fixed64") + .addInt32Field("primitive_sfixed32") + .addInt64Field("primitive_sfixed64") + .addBooleanField("primitive_bool") + .addStringField("primitive_string") + .addByteArrayField("primitive_bytes") + .build(); + static final Row PRIMITIVE_DEFAULT_ROW = + Row.withSchema(PRIMITIVE_SCHEMA) + .addValue((double) 0) + .addValue((float) 0) + .addValue(0) + .addValue(0L) + .addValue(0) + .addValue(0L) + .addValue(0) + .addValue(0L) + .addValue(0) + .addValue(0L) + .addValue(0) + .addValue(0L) + .addValue(Boolean.FALSE) + .addValue("") + .addValue(new byte[] {}) + .build(); + static final Schema MESSAGE_SCHEMA = + Schema.builder() + .addField("message", Schema.FieldType.row(PRIMITIVE_SCHEMA).withNullable(true)) + .addField( + "repeated_message", + Schema.FieldType.array( + // TODO: are the nullable's correct + Schema.FieldType.row(PRIMITIVE_SCHEMA).withNullable(true)) + .withNullable(true)) + .build(); + private static final Row MESSAGE_DEFAULT_ROW = + Row.withSchema(MESSAGE_SCHEMA).addValue(null).addValue(null).build(); + private static final Schema REPEAT_PRIMITIVE_SCHEMA = + Schema.builder() + .addField( + "repeated_double", Schema.FieldType.array(Schema.FieldType.DOUBLE).withNullable(true)) + .addField( + "repeated_float", Schema.FieldType.array(Schema.FieldType.FLOAT).withNullable(true)) + .addField( + "repeated_int32", Schema.FieldType.array(Schema.FieldType.INT32).withNullable(true)) + .addField( + "repeated_int64", Schema.FieldType.array(Schema.FieldType.INT64).withNullable(true)) + .addField( + "repeated_uint32",
[jira] [Work logged] (BEAM-7657) sdk worker parallelism comments are misleading
[ https://issues.apache.org/jira/browse/BEAM-7657?focusedWorklogId=321642=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321642 ] ASF GitHub Bot logged work on BEAM-7657: Author: ASF GitHub Bot Created on: 02/Oct/19 00:37 Start Date: 02/Oct/19 00:37 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #9709: [BEAM-7657] fix misleading sdk_worker_parallelism description URL: https://github.com/apache/beam/pull/9709 Context: https://issues.apache.org/jira/browse/BEAM-7657 R: @angoenka Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build
[jira] [Work logged] (BEAM-7981) ParDo function wrapper doesn't support Iterable output types
[ https://issues.apache.org/jira/browse/BEAM-7981?focusedWorklogId=321635=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321635 ] ASF GitHub Bot logged work on BEAM-7981: Author: ASF GitHub Bot Created on: 02/Oct/19 00:20 Start Date: 02/Oct/19 00:20 Worklog Time Spent: 10m Work Description: udim commented on pull request #9708: [BEAM-7981] Fix double iterable stripping URL: https://github.com/apache/beam/pull/9708 ... in CallableWrapperDoFn Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- |
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321633=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321633 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 02/Oct/19 00:07 Start Date: 02/Oct/19 00:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321633) Time Spent: 3h 20m (was: 3h 10m) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 3h 20m > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321632=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321632 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 02/Oct/19 00:05 Start Date: 02/Oct/19 00:05 Worklog Time Spent: 10m Work Description: robertwb commented on issue #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#issuecomment-537280471 The failures at org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful are irrelevant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321632) Time Spent: 3h 10m (was: 3h) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 3h 10m > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valentyn Tymofieiev resolved BEAM-7223. --- Resolution: Fixed > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 5h 40m > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321630 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 02/Oct/19 00:02 Start Date: 02/Oct/19 00:02 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#issuecomment-537277609 cc: @ibzib - Spark Python 3 tests can follow a similar pattern. cc: @robertwb as FYI that we added PVR Flink Python 3.5 suite to postcommits (see description). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321630) Time Spent: 5h 40m (was: 5.5h) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 5h 40m > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321629=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321629 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 23:58 Start Date: 01/Oct/19 23:58 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#discussion_r330325673 ## File path: sdks/python/test-suites/portable/py35/build.gradle ## @@ -29,4 +30,3 @@ task preCommitPy35() { dependsOn portableWordCountBatch dependsOn portableWordCountStreaming } Review comment: Actually, that was just an extra new line, just in this file. no follow up changes needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321629) Time Spent: 5.5h (was: 5h 20m) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 5.5h > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321627=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321627 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 23:52 Start Date: 01/Oct/19 23:52 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#issuecomment-537277609 cc: @ibzib @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321627) Time Spent: 5h 20m (was: 5h 10m) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 5h 20m > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321626=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321626 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 23:51 Start Date: 01/Oct/19 23:51 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321626) Time Spent: 5h 10m (was: 5h) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 5h 10m > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321625=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321625 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 23:49 Start Date: 01/Oct/19 23:49 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#discussion_r330323560 ## File path: sdks/python/test-suites/portable/py35/build.gradle ## @@ -29,4 +30,3 @@ task preCommitPy35() { dependsOn portableWordCountBatch dependsOn portableWordCountStreaming } Review comment: Thanks, I'll remove this in a follow up change since I'd like to keep the history that tests passed on this PR without running another seed job. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321625) Time Spent: 5h (was: 4h 50m) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 5h > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321624=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321624 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 23:48 Start Date: 01/Oct/19 23:48 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#discussion_r330321807 ## File path: sdks/python/test-suites/portable/py35/build.gradle ## @@ -29,4 +30,3 @@ task preCommitPy35() { dependsOn portableWordCountBatch dependsOn portableWordCountStreaming } Review comment: Ok, I'll add it back since now we get no new line at the end of file :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321624) Time Spent: 4h 50m (was: 4h 40m) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 4h 50m > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321623=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321623 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 23:48 Start Date: 01/Oct/19 23:48 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#discussion_r330320315 ## File path: sdks/python/test-suites/portable/py35/build.gradle ## @@ -29,4 +30,3 @@ task preCommitPy35() { dependsOn portableWordCountBatch dependsOn portableWordCountStreaming } Review comment: Thanks, probably removed by editor on autosave. I don't think we need them, so I'll remove them in other files touched by this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321623) Time Spent: 4h 40m (was: 4.5h) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 4h 40m > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321621=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321621 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 23:40 Start Date: 01/Oct/19 23:40 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#discussion_r330321807 ## File path: sdks/python/test-suites/portable/py35/build.gradle ## @@ -29,4 +30,3 @@ task preCommitPy35() { dependsOn portableWordCountBatch dependsOn portableWordCountStreaming } Review comment: Ok, I'll add it back since now we get no new line at the end of file :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321621) Time Spent: 4.5h (was: 4h 20m) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 4.5h > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321618=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321618 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 23:34 Start Date: 01/Oct/19 23:34 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#discussion_r330320315 ## File path: sdks/python/test-suites/portable/py35/build.gradle ## @@ -29,4 +30,3 @@ task preCommitPy35() { dependsOn portableWordCountBatch dependsOn portableWordCountStreaming } Review comment: Thanks, probably removed by editor on autosave. I don't think we need them, so I'll remove them in other files touched by this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321618) Time Spent: 4h 20m (was: 4h 10m) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8336) Make dockerized job server image configurable
Kyle Weaver created BEAM-8336: - Summary: Make dockerized job server image configurable Key: BEAM-8336 URL: https://issues.apache.org/jira/browse/BEAM-8336 Project: Beam Issue Type: Improvement Components: runner-flink Reporter: Kyle Weaver Assignee: Kyle Weaver I'd also argue that having PortableRunner bring up a Flink job server by default is unexpected behavior, but touching that would be a breaking change. It might be good to move some of this logic to Python's FlinkRunner class so we can have runner-specific defaults. [https://github.com/apache/beam/blob/c5f43342f914fc8ff367b86fb9294c38436ed3ce/sdks/python/apache_beam/runners/portability/job_server.py#L252-L254] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?focusedWorklogId=321603=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321603 ] ASF GitHub Bot logged work on BEAM-7933: Author: ASF GitHub Bot Created on: 01/Oct/19 22:33 Start Date: 01/Oct/19 22:33 Worklog Time Spent: 10m Work Description: ibzib commented on issue #9673: [BEAM-7933] Add job server request timeout (default to 60 seconds) URL: https://github.com/apache/beam/pull/9673#issuecomment-537260245 Please fix lint errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321603) Time Spent: 2h 20m (was: 2h 10m) > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > Labels: portability > Time Spent: 2h 20m > Remaining Estimate: 0h > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?focusedWorklogId=321602=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321602 ] ASF GitHub Bot logged work on BEAM-7933: Author: ASF GitHub Bot Created on: 01/Oct/19 22:32 Start Date: 01/Oct/19 22:32 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #9673: [BEAM-7933] Add job server request timeout (default to 60 seconds) URL: https://github.com/apache/beam/pull/9673#discussion_r330305499 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -251,7 +253,11 @@ def send_options_request(max_retries=5): # This reports channel is READY but connections may fail # Seems to be only an issue on Mac with port forwardings return job_service.DescribePipelineOptions( - beam_job_api_pb2.DescribePipelineOptionsRequest()) + beam_job_api_pb2.DescribePipelineOptionsRequest(), + timeout=portable_options.job_server_timeout) Review comment: Oh, I read everything wrong. :sweat_smile: It'd be great to have real tests to verify all these timeouts, but I guess they would not make suitable unit tests, as they would all require at least 1 second to complete. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321602) Time Spent: 2h 10m (was: 2h) > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > Labels: portability > Time Spent: 2h 10m > Remaining Estimate: 0h > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8335) Add streaming support to Interactive Beam
Sam Rohde created BEAM-8335: --- Summary: Add streaming support to Interactive Beam Key: BEAM-8335 URL: https://issues.apache.org/jira/browse/BEAM-8335 Project: Beam Issue Type: Improvement Components: runner-py-interactive Reporter: Sam Rohde Assignee: Sam Rohde This issue tracks the work items to introduce streaming support to the Interactive Beam experience. This will allow users to: * Write and run a streaming job in IPython * Automatically cache records from unbounded sources * Add a replay experience that replays all cached records to simulate the original pipeline execution * Add controls to play/pause/stop/step individual elements from the cached records * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8287) Update documentation for Python 3 support after Beam 2.16.0.
[ https://issues.apache.org/jira/browse/BEAM-8287?focusedWorklogId=321601=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321601 ] ASF GitHub Bot logged work on BEAM-8287: Author: ASF GitHub Bot Created on: 01/Oct/19 22:28 Start Date: 01/Oct/19 22:28 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9700: [BEAM-8287] Python3 GA docs updates URL: https://github.com/apache/beam/pull/9700#discussion_r330304570 ## File path: website/src/roadmap/python-sdk.md ## @@ -22,7 +22,7 @@ limitations under the License. ## Python 3 Support -Apache Beam first offered Python 3.5 support with the 2.11.0 SDK release and added Python 3.6, Python 3.7 support with the 2.14.0 version. However, we continue to polish some [rough edges](https://issues.apache.org/jira/browse/BEAM-1251?focusedCommentId=16890504=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-1689050) and strengthen Beam's Python 3 offering: +Apache Beam supports Python 3.5 or higher with the 2.16.0 SDK release. Python 3.5 beta support was first offered with the 2.11.0 SDK release; beta support for Python 3.6, Python 3.7 was added with the 2.14.0 version. We continue to polish some [rough edges](https://issues.apache.org/jira/browse/BEAM-1251?focusedCommentId=16890504=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-1689050) and strengthen Beam's Python 3 offering: Review comment: Beam does not have notions of Alpha/Beta. I suggest the following wording: Apache Beam offers Python 3.5, 3.6, 3.7 support since Beam 2.14.0. We continue to polish remaining rough edges to improve the experience of Python 3 users and prepare for eventual sunsetting of Beam Python 2 offering: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321601) Time Spent: 1h (was: 50m) > Update documentation for Python 3 support after Beam 2.16.0. > > > Key: BEAM-8287 > URL: https://issues.apache.org/jira/browse/BEAM-8287 > Project: Beam > Issue Type: Sub-task > Components: website >Reporter: Valentyn Tymofieiev >Assignee: Cyrus Maden >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321595=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321595 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 22:24 Start Date: 01/Oct/19 22:24 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330247591 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321593=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321593 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 22:24 Start Date: 01/Oct/19 22:24 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330247591 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321592=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321592 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 22:23 Start Date: 01/Oct/19 22:23 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330247591 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321591=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321591 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 22:23 Start Date: 01/Oct/19 22:23 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330242462 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321590=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321590 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 22:22 Start Date: 01/Oct/19 22:22 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330242462 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?focusedWorklogId=321589=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321589 ] ASF GitHub Bot logged work on BEAM-7933: Author: ASF GitHub Bot Created on: 01/Oct/19 22:21 Start Date: 01/Oct/19 22:21 Worklog Time Spent: 10m Work Description: ecanzonieri commented on pull request #9673: [BEAM-7933] Add job server request timeout (default to 60 seconds) URL: https://github.com/apache/beam/pull/9673#discussion_r330302508 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -251,7 +253,11 @@ def send_options_request(max_retries=5): # This reports channel is READY but connections may fail # Seems to be only an issue on Mac with port forwardings return job_service.DescribePipelineOptions( - beam_job_api_pb2.DescribePipelineOptionsRequest()) + beam_job_api_pb2.DescribePipelineOptionsRequest(), + timeout=portable_options.job_server_timeout) Review comment: hmmm, I may be missing something but the timeout is never added to the request. My understanding is that timeout itself is not a request parameter but rather a grpc method call parameter. @ibzib can you point me to the code where you see me adding the timeout to the request? Perhaps, I accidentally added it to the wrong place. I'm no grpc expert, but I've ran some manual tests on this feature: 1) run python driver without job server running and default timeout -> The driver times out as expected after 60 seconds. 2) run python driver without job server running and change timeout -> The driver times out as expected after the specified timeout. 3) run python driver with job server running but code changed to slow down Prepare() -> The driver times out as expected. 4) run python driver with long running streaming job (Run() method) -> the driver program never times out (note this is the expected behavior since the driver is stuck waiting for completion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321589) Time Spent: 2h (was: 1h 50m) > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > Labels: portability > Time Spent: 2h > Remaining Estimate: 0h > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321588=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321588 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 22:20 Start Date: 01/Oct/19 22:20 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330266912 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321587=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321587 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 22:19 Start Date: 01/Oct/19 22:19 Worklog Time Spent: 10m Work Description: KevinGG commented on issue #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#issuecomment-537256427 @aaltay I've added comments and made some change to the original PR in the 2nd commit. I'll draft a "vocabulary" doc for us to vote and discuss names of different components in an Interactive Beam notebook. PTAL. Thank you very much! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321587) Time Spent: 16h (was: 15h 50m) > Interactive Beam Caching PCollections bound to user defined vars in notebook > > > Key: BEAM-7760 > URL: https://issues.apache.org/jira/browse/BEAM-7760 > Project: Beam > Issue Type: New Feature > Components: examples-python >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 16h > Remaining Estimate: 0h > > Cache only PCollections bound to user defined variables in a pipeline when > running pipeline with interactive runner in jupyter notebooks. > [Interactive > Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] > has been caching and using caches of "leaf" PCollections for interactive > execution in jupyter notebooks. > The interactive execution is currently supported so that when appending new > transforms to existing pipeline for a new run, executed part of the pipeline > doesn't need to be re-executed. > A PCollection is "leaf" when it is never used as input in any PTransform in > the pipeline. > The problem with building caches and pipeline to execute around "leaf" is > that when a PCollection is consumed by a sink with no output, the pipeline to > execute built will miss the subgraph generating and consuming that > PCollection. > An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty > pipeline. > Caching around PCollections bound to user defined variables and replacing > transforms with source and sink of caches could resolve the pipeline to > execute properly under the interactive execution scenario. Also, cached > PCollection now can trace back to user code and can be used for user data > visualization if user wants to do it. > E.g., > {code:java} > // ... > p = beam.Pipeline(interactive_runner.InteractiveRunner(), > options=pipeline_options) > messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') > messages | "Write" >> beam.io.WriteToPubSub(topic_path) > result = p.run() > // ... > visualize(messages){code} > The interactive runner automatically figures out that PCollection > {code:java} > messages{code} > created by > {code:java} > p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} > should be cached and reused if the notebook user appends more transforms. > And once the pipeline gets executed, the user could use any > visualize(PCollection) module to visualize the data statically (batch) or > dynamically (stream) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321584=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321584 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 22:11 Start Date: 01/Oct/19 22:11 Worklog Time Spent: 10m Work Description: robertwb commented on issue #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#issuecomment-537254429 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321584) Time Spent: 3h (was: 2h 50m) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 3h > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8213) Run and report python tox tasks separately within Jenkins
[ https://issues.apache.org/jira/browse/BEAM-8213?focusedWorklogId=321582=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321582 ] ASF GitHub Bot logged work on BEAM-8213: Author: ASF GitHub Bot Created on: 01/Oct/19 22:08 Start Date: 01/Oct/19 22:08 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #9706: [BEAM-8213] Split out lint job from monolithic python preCommit tests on jenkins URL: https://github.com/apache/beam/pull/9706#discussion_r330298339 ## File path: build.gradle ## @@ -102,7 +102,7 @@ rat { // Json doesn't support comments. "**/*.json", - + Review comment: I just noticed my editor cleaned this up. Let me know if you want me to remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321582) Time Spent: 10h 10m (was: 10h) > Run and report python tox tasks separately within Jenkins > - > > Key: BEAM-8213 > URL: https://issues.apache.org/jira/browse/BEAM-8213 > Project: Beam > Issue Type: Improvement > Components: build-system >Reporter: Chad Dombrova >Priority: Major > Time Spent: 10h 10m > Remaining Estimate: 0h > > As a python developer, the speed and comprehensibility of the jenkins > PreCommit job could be greatly improved. > Here are some of the problems > - when a lint job fails, it's not reported in the test results summary, so > even though the job is marked as failed, I see "Test Result (no failures)" > which is quite confusing > - I have to wait for over an hour to discover the lint failed, which takes > about a minute to run on its own > - The logs are a jumbled mess of all the different tasks running on top of > each other > - The test results give no indication of which version of python they use. I > click on Test results, then the test module, then the test class, then I see > 4 tests named the same thing. I assume that the first is python 2.7, the > second is 3.5 and so on. It takes 5 clicks and then reading the log output > to know which version of python a single error pertains to, then I need to > repeat for each failure. This makes it very difficult to discover problems, > and deduce that they may have something to do with python version mismatches. > I believe the solution to this is to split up the single monolithic python > PreCommit job into sub-jobs (possibly using a pipeline with steps). This > would give us the following benefits: > - sub job results should become available as they finish, so for example, > lint results should be available very early on > - sub job results will be reported separately, and there will be a job for > each py2, py35, py36 and so on, so it will be clear when an error is related > to a particular python version > - sub jobs without reports, like docs and lint, will have their own failure > status and logs, so when they fail it will be more obvious what went wrong. > I'm happy to help out once I get some feedback on the desired way forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8213) Run and report python tox tasks separately within Jenkins
[ https://issues.apache.org/jira/browse/BEAM-8213?focusedWorklogId=321580=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321580 ] ASF GitHub Bot logged work on BEAM-8213: Author: ASF GitHub Bot Created on: 01/Oct/19 22:07 Start Date: 01/Oct/19 22:07 Worklog Time Spent: 10m Work Description: chadrik commented on issue #9706: [BEAM-8213] Split out lint job from monolithic python preCommit tests on jenkins URL: https://github.com/apache/beam/pull/9706#issuecomment-537253276 R: @youngoli R: @tvalentyn R: @aaltay R: @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321580) Time Spent: 10h (was: 9h 50m) > Run and report python tox tasks separately within Jenkins > - > > Key: BEAM-8213 > URL: https://issues.apache.org/jira/browse/BEAM-8213 > Project: Beam > Issue Type: Improvement > Components: build-system >Reporter: Chad Dombrova >Priority: Major > Time Spent: 10h > Remaining Estimate: 0h > > As a python developer, the speed and comprehensibility of the jenkins > PreCommit job could be greatly improved. > Here are some of the problems > - when a lint job fails, it's not reported in the test results summary, so > even though the job is marked as failed, I see "Test Result (no failures)" > which is quite confusing > - I have to wait for over an hour to discover the lint failed, which takes > about a minute to run on its own > - The logs are a jumbled mess of all the different tasks running on top of > each other > - The test results give no indication of which version of python they use. I > click on Test results, then the test module, then the test class, then I see > 4 tests named the same thing. I assume that the first is python 2.7, the > second is 3.5 and so on. It takes 5 clicks and then reading the log output > to know which version of python a single error pertains to, then I need to > repeat for each failure. This makes it very difficult to discover problems, > and deduce that they may have something to do with python version mismatches. > I believe the solution to this is to split up the single monolithic python > PreCommit job into sub-jobs (possibly using a pipeline with steps). This > would give us the following benefits: > - sub job results should become available as they finish, so for example, > lint results should be available very early on > - sub job results will be reported separately, and there will be a job for > each py2, py35, py36 and so on, so it will be clear when an error is related > to a particular python version > - sub jobs without reports, like docs and lint, will have their own failure > status and logs, so when they fail it will be more obvious what went wrong. > I'm happy to help out once I get some feedback on the desired way forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8213) Run and report python tox tasks separately within Jenkins
[ https://issues.apache.org/jira/browse/BEAM-8213?focusedWorklogId=321579=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321579 ] ASF GitHub Bot logged work on BEAM-8213: Author: ASF GitHub Bot Created on: 01/Oct/19 22:05 Start Date: 01/Oct/19 22:05 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #9706: [BEAM-8213] Split out lint job from monolithic python preCommit tests on jenkins URL: https://github.com/apache/beam/pull/9706 Take two. After discovering that #9642 would increase the jenkins pre-commit backlog by ~66% we decided to focus on the aspect of that PR that was a clear winner: splitting out the lint job. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321558=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321558 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 21:39 Start Date: 01/Oct/19 21:39 Worklog Time Spent: 10m Work Description: apilloud commented on issue #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703#issuecomment-537242868 Run Direct Runner Nexmark Tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321558) Time Spent: 1.5h (was: 1h 20m) > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > > rel#96:LogicalFilter.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,condition=>($1, > 0)), rowcount=50.0, cumulative cost={inf} > > rel#102:LogicalCalc.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={inf} > rel#104:Subset#1.BEAM_LOGICAL, best=rel#103, importance=0.405 > > rel#103:BeamCalcRel.BEAM_LOGICAL(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={150.0 > rows, 801.0 cpu, 0.0 io} > rel#106:Subset#1.ENUMERABLE, best=rel#105, importance=0.405 > > rel#105:BeamEnumerableConverter.ENUMERABLE(input=rel#104:Subset#1.BEAM_LOGICAL), > rowcount=50.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#2, type: RecordType(INTEGER id, INTEGER EXPR$1) > rel#99:Subset#2.NONE, best=null, importance=0.9 >
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321557=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321557 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 21:38 Start Date: 01/Oct/19 21:38 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330288300 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321556=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321556 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 21:38 Start Date: 01/Oct/19 21:38 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330288300 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321550=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321550 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 21:31 Start Date: 01/Oct/19 21:31 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330285605 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321537=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321537 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 21:20 Start Date: 01/Oct/19 21:20 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703#discussion_r330281262 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java ## @@ -40,15 +50,21 @@ public BeamBasicAggregationRule( Class aggregateClass, RelBuilderFactory relBuilderFactory) { -super(operand(aggregateClass, operand(TableScan.class, any())), relBuilderFactory, null); +super(operand(aggregateClass, operand(AbstractRelNode.class, any())), relBuilderFactory, null); Review comment: You are correct, updated match condition to use RelNode instead This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321537) Time Spent: 1h (was: 50m) > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > Time Spent: 1h > Remaining Estimate: 0h > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > > rel#96:LogicalFilter.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,condition=>($1, > 0)), rowcount=50.0, cumulative cost={inf} > > rel#102:LogicalCalc.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={inf} > rel#104:Subset#1.BEAM_LOGICAL, best=rel#103, importance=0.405 > > rel#103:BeamCalcRel.BEAM_LOGICAL(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1,
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321540=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321540 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 21:20 Start Date: 01/Oct/19 21:20 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703#discussion_r330281262 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java ## @@ -40,15 +50,21 @@ public BeamBasicAggregationRule( Class aggregateClass, RelBuilderFactory relBuilderFactory) { -super(operand(aggregateClass, operand(TableScan.class, any())), relBuilderFactory, null); +super(operand(aggregateClass, operand(AbstractRelNode.class, any())), relBuilderFactory, null); Review comment: You are correct, updated match condition to use RelNode instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321540) Time Spent: 1h 20m (was: 1h 10m) > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > > rel#96:LogicalFilter.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,condition=>($1, > 0)), rowcount=50.0, cumulative cost={inf} > > rel#102:LogicalCalc.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={inf} > rel#104:Subset#1.BEAM_LOGICAL, best=rel#103, importance=0.405 > >
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321539=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321539 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 21:20 Start Date: 01/Oct/19 21:20 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703#discussion_r330281513 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java ## @@ -40,15 +50,21 @@ public BeamBasicAggregationRule( Class aggregateClass, RelBuilderFactory relBuilderFactory) { -super(operand(aggregateClass, operand(TableScan.class, any())), relBuilderFactory, null); +super(operand(aggregateClass, operand(AbstractRelNode.class, any())), relBuilderFactory, null); } @Override public void onMatch(RelOptRuleCall call) { Aggregate aggregate = call.rel(0); -TableScan tableScan = call.rel(1); +AbstractRelNode relNode = call.rel(1); -RelNode newTableScan = tableScan.copy(tableScan.getTraitSet(), tableScan.getInputs()); +if (relNode instanceof Project || relNode instanceof Calc || relNode instanceof Filter) { + if (isWindowed(relNode) || hasWindowedParents(relNode)) { +return; Review comment: Added a comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321539) Time Spent: 1h 10m (was: 1h) > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > Time Spent: 1h 10m > Remaining Estimate: 0h > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > > rel#96:LogicalFilter.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,condition=>($1, > 0)), rowcount=50.0,
[jira] [Work logged] (BEAM-8334) Expose Language Options for testing
[ https://issues.apache.org/jira/browse/BEAM-8334?focusedWorklogId=321536=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321536 ] ASF GitHub Bot logged work on BEAM-8334: Author: ASF GitHub Bot Created on: 01/Oct/19 21:17 Start Date: 01/Oct/19 21:17 Worklog Time Spent: 10m Work Description: apilloud commented on issue #9704: [BEAM-8334] Expose Language Options for testing URL: https://github.com/apache/beam/pull/9704#issuecomment-537234450 R: @amaliujia This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321536) Time Spent: 20m (was: 10m) > Expose Language Options for testing > --- > > Key: BEAM-8334 > URL: https://issues.apache.org/jira/browse/BEAM-8334 > Project: Beam > Issue Type: New Feature > Components: dsl-sql-zetasql >Reporter: Andrew Pilloud >Assignee: Andrew Pilloud >Priority: Trivial > Time Spent: 20m > Remaining Estimate: 0h > > Google has a set of compliance tests for ZetaSQL. The test framework needs > access to LanguageOptions to determine what tests are supported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321532=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321532 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 21:09 Start Date: 01/Oct/19 21:09 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330201369 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321531=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321531 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 21:09 Start Date: 01/Oct/19 21:09 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330201369 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-8146) SchemaCoder/RowCoder have no equals() function
[ https://issues.apache.org/jira/browse/BEAM-8146?focusedWorklogId=321530=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321530 ] ASF GitHub Bot logged work on BEAM-8146: Author: ASF GitHub Bot Created on: 01/Oct/19 21:07 Start Date: 01/Oct/19 21:07 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #9493: [BEAM-8146,BEAM-8204,BEAM-8205] Add equals and hashCode to SchemaCoder and RowCoder URL: https://github.com/apache/beam/pull/9493#issuecomment-537231094 Thing to watch out for is update (in)compatibility. It happened once when a contribution made a good refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321530) Time Spent: 2h 10m (was: 2h) > SchemaCoder/RowCoder have no equals() function > -- > > Key: BEAM-8146 > URL: https://issues.apache.org/jira/browse/BEAM-8146 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.15.0 >Reporter: Brian Hulette >Assignee: Brian Hulette >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > SchemaCoder has no equals function, so it can't be compared in tests, like > CloudComponentsTests$DefaultCoders, which is being re-enabled in > https://github.com/apache/beam/pull/9446 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321529=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321529 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 21:04 Start Date: 01/Oct/19 21:04 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330266912 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321527=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321527 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 21:02 Start Date: 01/Oct/19 21:02 Worklog Time Spent: 10m Work Description: apilloud commented on pull request #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703#discussion_r330273314 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java ## @@ -40,15 +50,21 @@ public BeamBasicAggregationRule( Class aggregateClass, RelBuilderFactory relBuilderFactory) { -super(operand(aggregateClass, operand(TableScan.class, any())), relBuilderFactory, null); +super(operand(aggregateClass, operand(AbstractRelNode.class, any())), relBuilderFactory, null); } @Override public void onMatch(RelOptRuleCall call) { Aggregate aggregate = call.rel(0); -TableScan tableScan = call.rel(1); +AbstractRelNode relNode = call.rel(1); -RelNode newTableScan = tableScan.copy(tableScan.getTraitSet(), tableScan.getInputs()); +if (relNode instanceof Project || relNode instanceof Calc || relNode instanceof Filter) { + if (isWindowed(relNode) || hasWindowedParents(relNode)) { +return; Review comment: Probably worth adding a comment here that this case is expected to be handled by `BeamAggregationRule`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321527) Time Spent: 40m (was: 0.5h) > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > Time Spent: 40m > Remaining Estimate: 0h > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > >
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321528=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321528 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 21:02 Start Date: 01/Oct/19 21:02 Worklog Time Spent: 10m Work Description: apilloud commented on pull request #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703#discussion_r330272966 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java ## @@ -40,15 +50,21 @@ public BeamBasicAggregationRule( Class aggregateClass, RelBuilderFactory relBuilderFactory) { -super(operand(aggregateClass, operand(TableScan.class, any())), relBuilderFactory, null); +super(operand(aggregateClass, operand(AbstractRelNode.class, any())), relBuilderFactory, null); Review comment: Looking at examples of this in Calcite, I think `RelNode` is preferable to `AbstractRelNode`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321528) Time Spent: 50m (was: 40m) > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > > rel#96:LogicalFilter.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,condition=>($1, > 0)), rowcount=50.0, cumulative cost={inf} > > rel#102:LogicalCalc.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={inf} > rel#104:Subset#1.BEAM_LOGICAL, best=rel#103, importance=0.405 > >
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321523=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321523 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 20:57 Start Date: 01/Oct/19 20:57 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330202786 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321522=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321522 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 20:54 Start Date: 01/Oct/19 20:54 Worklog Time Spent: 10m Work Description: apilloud commented on issue #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703#issuecomment-537226095 Run Direct Runner Nexmark Tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321522) Time Spent: 0.5h (was: 20m) > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > > rel#96:LogicalFilter.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,condition=>($1, > 0)), rowcount=50.0, cumulative cost={inf} > > rel#102:LogicalCalc.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={inf} > rel#104:Subset#1.BEAM_LOGICAL, best=rel#103, importance=0.405 > > rel#103:BeamCalcRel.BEAM_LOGICAL(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={150.0 > rows, 801.0 cpu, 0.0 io} > rel#106:Subset#1.ENUMERABLE, best=rel#105, importance=0.405 > > rel#105:BeamEnumerableConverter.ENUMERABLE(input=rel#104:Subset#1.BEAM_LOGICAL), > rowcount=50.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#2, type: RecordType(INTEGER id, INTEGER EXPR$1) > rel#99:Subset#2.NONE, best=null, importance=0.9 >
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321521=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321521 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 20:53 Start Date: 01/Oct/19 20:53 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330197374 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( Review comment: Inside this module, the pipeline instance is actually modified (with read/write cache transforms). This approach takes a snapshot of the original pipeline before mutating it so that the invoking [Interactive]Runner can always recover the original pipeline. Note: the pipeline instance received by this module is from the runner. It's highly possible that a round-trip between pipeline and runner_api proto has been done by the runner itself. For example, the pipeline instance the notebook user has defined (and continue developing) in their notebook will not be the same instance (but a copy) used during run_pipeline(pipeline) by InteractiveRunner. This module will receive that copied pipeline instance,
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321519=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321519 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 20:47 Start Date: 01/Oct/19 20:47 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330201369 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321518=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321518 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 20:46 Start Date: 01/Oct/19 20:46 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330266912 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-8287) Update documentation for Python 3 support after Beam 2.16.0.
[ https://issues.apache.org/jira/browse/BEAM-8287?focusedWorklogId=321514=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321514 ] ASF GitHub Bot logged work on BEAM-8287: Author: ASF GitHub Bot Created on: 01/Oct/19 20:43 Start Date: 01/Oct/19 20:43 Worklog Time Spent: 10m Work Description: soyrice commented on issue #9700: [BEAM-8287] Python3 GA docs updates URL: https://github.com/apache/beam/pull/9700#issuecomment-537221631 Staging links: http://apache-beam-website-pull-requests.storage.googleapis.com/9700/get-started/quickstart-py/index.html http://apache-beam-website-pull-requests.storage.googleapis.com/9700/documentation/programming-guide/index.html http://apache-beam-website-pull-requests.storage.googleapis.com/9700/roadmap/python-sdk/index.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321514) Time Spent: 50m (was: 40m) > Update documentation for Python 3 support after Beam 2.16.0. > > > Key: BEAM-8287 > URL: https://issues.apache.org/jira/browse/BEAM-8287 > Project: Beam > Issue Type: Sub-task > Components: website >Reporter: Valentyn Tymofieiev >Assignee: Cyrus Maden >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8334) Expose Language Options for testing
[ https://issues.apache.org/jira/browse/BEAM-8334?focusedWorklogId=321507=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321507 ] ASF GitHub Bot logged work on BEAM-8334: Author: ASF GitHub Bot Created on: 01/Oct/19 20:34 Start Date: 01/Oct/19 20:34 Worklog Time Spent: 10m Work Description: apilloud commented on pull request #9704: [BEAM-8334] Expose Language Options for testing URL: https://github.com/apache/beam/pull/9704 This exposes Language Options for testing. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
[jira] [Assigned] (BEAM-8291) org.apache.beam.sdk.schemas.AvroSchemaTest sensitive to system timezone
[ https://issues.apache.org/jira/browse/BEAM-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brian Hulette reassigned BEAM-8291: --- Assignee: Brian Hulette > org.apache.beam.sdk.schemas.AvroSchemaTest sensitive to system timezone > --- > > Key: BEAM-8291 > URL: https://issues.apache.org/jira/browse/BEAM-8291 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Brian Hulette >Assignee: Brian Hulette >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > I noticed that AvroSchemaTest.{testRowToSpecificRecord, testRowToPojo} are > failing on my local machine, but don't seem to fail on Jenkins. I looked into > it a bit and it seems to be because my default timezone is > America/Los_Angeles, but the ROW_FOR_POJO is initialized with an Instant at > the start of the day in UTC -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-8291) org.apache.beam.sdk.schemas.AvroSchemaTest sensitive to system timezone
[ https://issues.apache.org/jira/browse/BEAM-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brian Hulette closed BEAM-8291. --- Fix Version/s: 2.17.0 Resolution: Fixed > org.apache.beam.sdk.schemas.AvroSchemaTest sensitive to system timezone > --- > > Key: BEAM-8291 > URL: https://issues.apache.org/jira/browse/BEAM-8291 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Brian Hulette >Assignee: Brian Hulette >Priority: Major > Fix For: 2.17.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > I noticed that AvroSchemaTest.{testRowToSpecificRecord, testRowToPojo} are > failing on my local machine, but don't seem to fail on Jenkins. I looked into > it a bit and it seems to be because my default timezone is > America/Los_Angeles, but the ROW_FOR_POJO is initialized with an Instant at > the start of the day in UTC -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8287) Update documentation for Python 3 support after Beam 2.16.0.
[ https://issues.apache.org/jira/browse/BEAM-8287?focusedWorklogId=321505=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321505 ] ASF GitHub Bot logged work on BEAM-8287: Author: ASF GitHub Bot Created on: 01/Oct/19 20:32 Start Date: 01/Oct/19 20:32 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #9700: [BEAM-8287] Python3 GA docs updates URL: https://github.com/apache/beam/pull/9700#discussion_r330260969 ## File path: website/src/documentation/programming-guide.md ## @@ -39,6 +39,9 @@ how to implement Beam concepts in your pipelines. +{:.language-py} +New versions of the Python SDK will only support Python 3.5 or higher. Currently, the Python SDK still supports Python 2.7.x. We recommend using the latest Python 3 version. Review comment: How about the following wording: New releases of the Python SDK will soon require Python 3.5 or higher. We recommend that new users start using Beam with Python 3. Or: Currently, the Python SDK still supports Python 2.7.x, however new releases of the Python SDK will soon require Python 3.5 or higher. We recommend that new users start using Beam with Python 3. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321505) Time Spent: 40m (was: 0.5h) > Update documentation for Python 3 support after Beam 2.16.0. > > > Key: BEAM-8287 > URL: https://issues.apache.org/jira/browse/BEAM-8287 > Project: Beam > Issue Type: Sub-task > Components: website >Reporter: Valentyn Tymofieiev >Assignee: Cyrus Maden >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321506=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321506 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 20:32 Start Date: 01/Oct/19 20:32 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330201369 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Created] (BEAM-8334) Expose Language Options for testing
Andrew Pilloud created BEAM-8334: Summary: Expose Language Options for testing Key: BEAM-8334 URL: https://issues.apache.org/jira/browse/BEAM-8334 Project: Beam Issue Type: New Feature Components: dsl-sql-zetasql Reporter: Andrew Pilloud Assignee: Andrew Pilloud Google has a set of compliance tests for ZetaSQL. The test framework needs access to LanguageOptions to determine what tests are supported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8334) Expose Language Options for testing
[ https://issues.apache.org/jira/browse/BEAM-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Pilloud updated BEAM-8334: - Status: Open (was: Triage Needed) > Expose Language Options for testing > --- > > Key: BEAM-8334 > URL: https://issues.apache.org/jira/browse/BEAM-8334 > Project: Beam > Issue Type: New Feature > Components: dsl-sql-zetasql >Reporter: Andrew Pilloud >Assignee: Andrew Pilloud >Priority: Trivial > > Google has a set of compliance tests for ZetaSQL. The test framework needs > access to LanguageOptions to determine what tests are supported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321503=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321503 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 20:30 Start Date: 01/Oct/19 20:30 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#discussion_r330260043 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ClassLoaderArtifactRetrievalService.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.artifact; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An {@link ArtifactRetrievalService} that loads artifacts as {@link ClassLoader} resources. + * + * The retrieval token should be a path to a JSON-formatted ProxyManifest accessible vai {@link + * ClassLoader#getResource(String)} whose resource locations also point to paths loadable via {@link + * ClassLoader#getResource(String)}. + */ +public class ClassLoaderArtifactRetrievalService extends AbstractArtifactRetrievalService { + + private final ClassLoader classLoader; + + public ClassLoaderArtifactRetrievalService() { +this(ClassLoaderArtifactRetrievalService.class.getClassLoader()); + } + + public ClassLoaderArtifactRetrievalService(ClassLoader classLoader) { +this.classLoader = classLoader; + } + + @Override + public InputStream openManifest(String retrievalToken) throws IOException { +return openUri(retrievalToken, retrievalToken); + } + + @Override + public InputStream openUri(String retrievalToken, String uri) throws IOException { +if (uri.charAt(0) == '/') { Review comment: Ah, missed that. Thanks for the explanation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321503) Time Spent: 2h 50m (was: 2h 40m) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 2h 50m > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-8334) Expose Language Options for testing
[ https://issues.apache.org/jira/browse/BEAM-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-8334 started by Andrew Pilloud. > Expose Language Options for testing > --- > > Key: BEAM-8334 > URL: https://issues.apache.org/jira/browse/BEAM-8334 > Project: Beam > Issue Type: New Feature > Components: dsl-sql-zetasql >Reporter: Andrew Pilloud >Assignee: Andrew Pilloud >Priority: Trivial > > Google has a set of compliance tests for ZetaSQL. The test framework needs > access to LanguageOptions to determine what tests are supported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321502=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321502 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 20:21 Start Date: 01/Oct/19 20:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#discussion_r330256261 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ClassLoaderArtifactRetrievalService.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.artifact; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An {@link ArtifactRetrievalService} that loads artifacts as {@link ClassLoader} resources. + * + * The retrieval token should be a path to a JSON-formatted ProxyManifest accessible vai {@link + * ClassLoader#getResource(String)} whose resource locations also point to paths loadable via {@link + * ClassLoader#getResource(String)}. + */ +public class ClassLoaderArtifactRetrievalService extends AbstractArtifactRetrievalService { + + private final ClassLoader classLoader; + + public ClassLoaderArtifactRetrievalService() { +this(ClassLoaderArtifactRetrievalService.class.getClassLoader()); + } + + public ClassLoaderArtifactRetrievalService(ClassLoader classLoader) { +this.classLoader = classLoader; + } + + @Override + public InputStream openManifest(String retrievalToken) throws IOException { +return openUri(retrievalToken, retrievalToken); + } + + @Override + public InputStream openUri(String retrievalToken, String uri) throws IOException { +if (uri.charAt(0) == '/') { Review comment: Looks like you're using the class's getResource (which differentiates between relative and absolute). Here I'm using the ClassLoader's. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321502) Time Spent: 2h 40m (was: 2.5h) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 2h 40m > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321501=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321501 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 20:20 Start Date: 01/Oct/19 20:20 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#discussion_r330255831 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/JavaFilesystemArtifactStagingService.java ## @@ -30,7 +30,7 @@ /** * An {@link ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase} that loads artifacts into a - * Java FileSystem. + * Java {@link }FileSystem}. Review comment: Gah. Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321501) Time Spent: 2.5h (was: 2h 20m) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 2.5h > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7223) Add ValidatesRunner test suite for Flink on Python 3.
[ https://issues.apache.org/jira/browse/BEAM-7223?focusedWorklogId=321500=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321500 ] ASF GitHub Bot logged work on BEAM-7223: Author: ASF GitHub Bot Created on: 01/Oct/19 20:13 Start Date: 01/Oct/19 20:13 Worklog Time Spent: 10m Work Description: angoenka commented on pull request #9691: [BEAM-7223] Add Python 3.5 Flink ValidatesRunner postcommit suite. URL: https://github.com/apache/beam/pull/9691#discussion_r330251926 ## File path: sdks/python/test-suites/portable/py35/build.gradle ## @@ -29,4 +30,3 @@ task preCommitPy35() { dependsOn portableWordCountBatch dependsOn portableWordCountStreaming } Review comment: empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321500) Time Spent: 4h 10m (was: 4h) > Add ValidatesRunner test suite for Flink on Python 3. > - > > Key: BEAM-7223 > URL: https://issues.apache.org/jira/browse/BEAM-7223 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ankur Goenka >Assignee: Valentyn Tymofieiev >Priority: Major > Fix For: 2.17.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > Add py3 integration tests for Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321487 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 20:00 Start Date: 01/Oct/19 20:00 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330247591 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321481=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321481 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 19:52 Start Date: 01/Oct/19 19:52 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330244341 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321477 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 19:50 Start Date: 01/Oct/19 19:50 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330242462 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321476=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321476 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 19:50 Start Date: 01/Oct/19 19:50 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330242462 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321473 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 19:49 Start Date: 01/Oct/19 19:49 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330242462 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321475 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 19:49 Start Date: 01/Oct/19 19:49 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330242462 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321472=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321472 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 19:48 Start Date: 01/Oct/19 19:48 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330242462 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321471=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321471 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 19:39 Start Date: 01/Oct/19 19:39 Worklog Time Spent: 10m Work Description: robertwb commented on issue #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#issuecomment-537195899 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321471) Time Spent: 2h 20m (was: 2h 10m) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8287) Update documentation for Python 3 support after Beam 2.16.0.
[ https://issues.apache.org/jira/browse/BEAM-8287?focusedWorklogId=321469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321469 ] ASF GitHub Bot logged work on BEAM-8287: Author: ASF GitHub Bot Created on: 01/Oct/19 19:27 Start Date: 01/Oct/19 19:27 Worklog Time Spent: 10m Work Description: soyrice commented on issue #9700: [BEAM-8287] Python3 GA docs updates URL: https://github.com/apache/beam/pull/9700#issuecomment-537191477 > Any changes needed for the Quickstart? https://beam.apache.org/get-started/quickstart-py/#check-your-python-version None needed - checked with Valentyn. The snippets are version-agnostic and the opening line still works, but will need to change once 2.7 is deprecated. I'll add a note about the upcoming deprecation like in the programming guide This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321469) Time Spent: 0.5h (was: 20m) > Update documentation for Python 3 support after Beam 2.16.0. > > > Key: BEAM-8287 > URL: https://issues.apache.org/jira/browse/BEAM-8287 > Project: Beam > Issue Type: Sub-task > Components: website >Reporter: Valentyn Tymofieiev >Assignee: Cyrus Maden >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321456=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321456 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 19:01 Start Date: 01/Oct/19 19:01 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#discussion_r330220994 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/JavaFilesystemArtifactStagingService.java ## @@ -30,7 +30,7 @@ /** * An {@link ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase} that loads artifacts into a - * Java FileSystem. + * Java {@link }FileSystem}. Review comment: Looks like there's an extra brace here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321456) Time Spent: 2h (was: 1h 50m) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 2h > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321457=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321457 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 19:01 Start Date: 01/Oct/19 19:01 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#discussion_r330222775 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ClassLoaderArtifactRetrievalService.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.artifact; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An {@link ArtifactRetrievalService} that loads artifacts as {@link ClassLoader} resources. + * + * The retrieval token should be a path to a JSON-formatted ProxyManifest accessible vai {@link + * ClassLoader#getResource(String)} whose resource locations also point to paths loadable via {@link + * ClassLoader#getResource(String)}. + */ +public class ClassLoaderArtifactRetrievalService extends AbstractArtifactRetrievalService { + + private final ClassLoader classLoader; + + public ClassLoaderArtifactRetrievalService() { +this(ClassLoaderArtifactRetrievalService.class.getClassLoader()); + } + + public ClassLoaderArtifactRetrievalService(ClassLoader classLoader) { +this.classLoader = classLoader; + } + + @Override + public InputStream openManifest(String retrievalToken) throws IOException { +return openUri(retrievalToken, retrievalToken); + } + + @Override + public InputStream openUri(String retrievalToken, String uri) throws IOException { +if (uri.charAt(0) == '/') { Review comment: That's strange.. I was having the opposite problem, where I had to add the leading slash or else it couldn't resolve my uris (eg https://github.com/apache/beam/blob/81c44b446d40eff6812f45ed7c4e78e845f2eee2/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java#L108) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321457) Time Spent: 2h 10m (was: 2h) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 2h 10m > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] >
[jira] [Work logged] (BEAM-8287) Update documentation for Python 3 support after Beam 2.16.0.
[ https://issues.apache.org/jira/browse/BEAM-8287?focusedWorklogId=321451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321451 ] ASF GitHub Bot logged work on BEAM-8287: Author: ASF GitHub Bot Created on: 01/Oct/19 18:55 Start Date: 01/Oct/19 18:55 Worklog Time Spent: 10m Work Description: rosetn commented on issue #9700: [BEAM-8287] Python3 GA docs updates URL: https://github.com/apache/beam/pull/9700#issuecomment-537179218 Any changes needed for the Quickstart? https://beam.apache.org/get-started/quickstart-py/#check-your-python-version This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321451) Time Spent: 20m (was: 10m) > Update documentation for Python 3 support after Beam 2.16.0. > > > Key: BEAM-8287 > URL: https://issues.apache.org/jira/browse/BEAM-8287 > Project: Beam > Issue Type: Sub-task > Components: website >Reporter: Valentyn Tymofieiev >Assignee: Cyrus Maden >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5967) ProtoCoder doesn't support DynamicMessage
[ https://issues.apache.org/jira/browse/BEAM-5967?focusedWorklogId=321449=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321449 ] ASF GitHub Bot logged work on BEAM-5967: Author: ASF GitHub Bot Created on: 01/Oct/19 18:52 Start Date: 01/Oct/19 18:52 Worklog Time Spent: 10m Work Description: pabloem commented on issue #8496: [BEAM-5967] Add handling of DynamicMessage in ProtoCoder URL: https://github.com/apache/beam/pull/8496#issuecomment-537177956 r: @TheNeuralBit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321449) Time Spent: 4h (was: 3h 50m) > ProtoCoder doesn't support DynamicMessage > - > > Key: BEAM-5967 > URL: https://issues.apache.org/jira/browse/BEAM-5967 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 2.8.0 >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Major > Fix For: 2.17.0 > > Time Spent: 4h > Remaining Estimate: 0h > > The ProtoCoder does make some assumptions about static messages being > available. The DynamicMessage doesn't have some of them, mainly because the > proto schema is defined at runtime and not at compile time. > Does it make sense to make a special coder for DynamicMessage or build it > into the normal ProtoCoder. > Here is an example of the assumtion being made in the current Codec: > {code:java} > try { > @SuppressWarnings("unchecked") > T protoMessageInstance = (T) > protoMessageClass.getMethod("getDefaultInstance").invoke(null); > @SuppressWarnings("unchecked") > Parser tParser = (Parser) protoMessageInstance.getParserForType(); > memoizedParser = tParser; > } catch (IllegalAccessException | InvocationTargetException | > NoSuchMethodException e) { > throw new IllegalArgumentException(e); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8333) Python SDK Worker should log lulls with progress-reporting thread
[ https://issues.apache.org/jira/browse/BEAM-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pablo Estrada updated BEAM-8333: Status: Open (was: Triage Needed) > Python SDK Worker should log lulls with progress-reporting thread > - > > Key: BEAM-8333 > URL: https://issues.apache.org/jira/browse/BEAM-8333 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8291) org.apache.beam.sdk.schemas.AvroSchemaTest sensitive to system timezone
[ https://issues.apache.org/jira/browse/BEAM-8291?focusedWorklogId=321448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321448 ] ASF GitHub Bot logged work on BEAM-8291: Author: ASF GitHub Bot Created on: 01/Oct/19 18:49 Start Date: 01/Oct/19 18:49 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #9678: [BEAM-8291] Always use UTC in generated code to convert to a LocalDate URL: https://github.com/apache/beam/pull/9678 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321448) Time Spent: 0.5h (was: 20m) > org.apache.beam.sdk.schemas.AvroSchemaTest sensitive to system timezone > --- > > Key: BEAM-8291 > URL: https://issues.apache.org/jira/browse/BEAM-8291 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Brian Hulette >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > I noticed that AvroSchemaTest.{testRowToSpecificRecord, testRowToPojo} are > failing on my local machine, but don't seem to fail on Jenkins. I looked into > it a bit and it seems to be because my default timezone is > America/Los_Angeles, but the ROW_FOR_POJO is initialized with an Instant at > the start of the day in UTC -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8291) org.apache.beam.sdk.schemas.AvroSchemaTest sensitive to system timezone
[ https://issues.apache.org/jira/browse/BEAM-8291?focusedWorklogId=321445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321445 ] ASF GitHub Bot logged work on BEAM-8291: Author: ASF GitHub Bot Created on: 01/Oct/19 18:48 Start Date: 01/Oct/19 18:48 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #9678: [BEAM-8291] Always use UTC in generated code to convert to a LocalDate URL: https://github.com/apache/beam/pull/9678#issuecomment-537176594 lgtm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321445) Time Spent: 20m (was: 10m) > org.apache.beam.sdk.schemas.AvroSchemaTest sensitive to system timezone > --- > > Key: BEAM-8291 > URL: https://issues.apache.org/jira/browse/BEAM-8291 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Brian Hulette >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > I noticed that AvroSchemaTest.{testRowToSpecificRecord, testRowToPojo} are > failing on my local machine, but don't seem to fail on Jenkins. I looked into > it a bit and it seems to be because my default timezone is > America/Los_Angeles, but the ROW_FOR_POJO is initialized with an Instant at > the start of the day in UTC -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8333) Python SDK Worker should log lulls with progress-reporting thread
Pablo Estrada created BEAM-8333: --- Summary: Python SDK Worker should log lulls with progress-reporting thread Key: BEAM-8333 URL: https://issues.apache.org/jira/browse/BEAM-8333 Project: Beam Issue Type: Improvement Components: sdk-py-harness Reporter: Pablo Estrada Assignee: Pablo Estrada -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321440 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 18:43 Start Date: 01/Oct/19 18:43 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703#issuecomment-537174145 R: @apilloud This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321440) Time Spent: 20m (was: 10m) > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > > rel#96:LogicalFilter.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,condition=>($1, > 0)), rowcount=50.0, cumulative cost={inf} > > rel#102:LogicalCalc.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={inf} > rel#104:Subset#1.BEAM_LOGICAL, best=rel#103, importance=0.405 > > rel#103:BeamCalcRel.BEAM_LOGICAL(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={150.0 > rows, 801.0 cpu, 0.0 io} > rel#106:Subset#1.ENUMERABLE, best=rel#105, importance=0.405 > > rel#105:BeamEnumerableConverter.ENUMERABLE(input=rel#104:Subset#1.BEAM_LOGICAL), > rowcount=50.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#2, type: RecordType(INTEGER id, INTEGER EXPR$1) > rel#99:Subset#2.NONE, best=null, importance=0.9 > >
[jira] [Assigned] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirill Kozlov reassigned BEAM-6995: --- Assignee: Kirill Kozlov > SQL aggregation with where clause fails to plan > --- > > Key: BEAM-6995 > URL: https://issues.apache.org/jira/browse/BEAM-6995 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.11.0 >Reporter: David McIntosh >Assignee: Kirill Kozlov >Priority: Minor > > I'm finding that this code fails with a CannotPlanException listed below. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .build(); > Row row = Row.withSchema(schema).addValues(1, 2).build(); > PCollection inputData = p.apply("row input", > Create.of(row).withRowSchema(schema)); > inputData.apply("sql", > SqlTransform.query( > "SELECT id, SUM(val) " > + "FROM PCOLLECTION " > + "WHERE val > 0 " > + "GROUP BY id"));{code} > If the WHERE clause is removed the code runs successfully. > This may be similar to BEAM-5384 since I was able to work around this by > adding an extra column to the input that isn't reference in the sql. > {code:java} > Schema schema = Schema.builder() > .addInt32Field("id") > .addInt32Field("val") > .addInt32Field("extra") > .build();{code} > > {code:java} > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#100:Subset#2.BEAM_LOGICAL] could not be implemented; planner state: > Root: rel#100:Subset#2.BEAM_LOGICAL > Original rel: > LogicalAggregate(subset=[rel#100:Subset#2.BEAM_LOGICAL], group=[{0}], > EXPR$1=[SUM($1)]): rowcount = 5.0, cumulative cost = {5.687500238418579 rows, > 0.0 cpu, 0.0 io}, id = 98 > LogicalFilter(subset=[rel#97:Subset#1.NONE], condition=[>($1, 0)]): > rowcount = 50.0, cumulative cost = {50.0 rows, 100.0 cpu, 0.0 io}, id = 96 > BeamIOSourceRel(subset=[rel#95:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, > 0.0 io}, id = 92 > Sets: > Set#0, type: RecordType(INTEGER id, INTEGER val) > rel#95:Subset#0.BEAM_LOGICAL, best=rel#92, > importance=0.7291 > rel#92:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > rel#110:Subset#0.ENUMERABLE, best=rel#109, > importance=0.36455 > > rel#109:BeamEnumerableConverter.ENUMERABLE(input=rel#95:Subset#0.BEAM_LOGICAL), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#1, type: RecordType(INTEGER id, INTEGER val) > rel#97:Subset#1.NONE, best=null, importance=0.81 > > rel#96:LogicalFilter.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,condition=>($1, > 0)), rowcount=50.0, cumulative cost={inf} > > rel#102:LogicalCalc.NONE(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={inf} > rel#104:Subset#1.BEAM_LOGICAL, best=rel#103, importance=0.405 > > rel#103:BeamCalcRel.BEAM_LOGICAL(input=rel#95:Subset#0.BEAM_LOGICAL,expr#0..1={inputs},expr#2=0,expr#3=>($t1, > $t2),id=$t0,val=$t1,$condition=$t3), rowcount=50.0, cumulative cost={150.0 > rows, 801.0 cpu, 0.0 io} > rel#106:Subset#1.ENUMERABLE, best=rel#105, importance=0.405 > > rel#105:BeamEnumerableConverter.ENUMERABLE(input=rel#104:Subset#1.BEAM_LOGICAL), > rowcount=50.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Set#2, type: RecordType(INTEGER id, INTEGER EXPR$1) > rel#99:Subset#2.NONE, best=null, importance=0.9 > > rel#98:LogicalAggregate.NONE(input=rel#97:Subset#1.NONE,group={0},EXPR$1=SUM($1)), > rowcount=5.0, cumulative cost={inf} > rel#100:Subset#2.BEAM_LOGICAL, best=null, importance=1.0 > > rel#101:AbstractConverter.BEAM_LOGICAL(input=rel#99:Subset#2.NONE,convention=BEAM_LOGICAL), > rowcount=5.0, cumulative cost={inf} > at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:437) > at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:296) > at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:657) > at >
[jira] [Work logged] (BEAM-6995) SQL aggregation with where clause fails to plan
[ https://issues.apache.org/jira/browse/BEAM-6995?focusedWorklogId=321439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321439 ] ASF GitHub Bot logged work on BEAM-6995: Author: ASF GitHub Bot Created on: 01/Oct/19 18:41 Start Date: 01/Oct/19 18:41 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9703: [BEAM-6995] Beam basic aggregation rule only when not windowed URL: https://github.com/apache/beam/pull/9703 Beam basic aggregation rule should not be applied on Calc, Project, and Filter when their parents/they utilize windowed functions. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?focusedWorklogId=321436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321436 ] ASF GitHub Bot logged work on BEAM-7933: Author: ASF GitHub Bot Created on: 01/Oct/19 18:37 Start Date: 01/Oct/19 18:37 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #9673: [BEAM-7933] Add job server request timeout (default to 60 seconds) URL: https://github.com/apache/beam/pull/9673#discussion_r330209685 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -251,7 +253,11 @@ def send_options_request(max_retries=5): # This reports channel is READY but connections may fail # Seems to be only an issue on Mac with port forwardings return job_service.DescribePipelineOptions( - beam_job_api_pb2.DescribePipelineOptionsRequest()) + beam_job_api_pb2.DescribePipelineOptionsRequest(), + timeout=portable_options.job_server_timeout) Review comment: Adding the timeout to the stubs fixes the tests, but are you sure adding the timeout like that is actually effective? In all the other places you changed, you put timeout in the requests, but here you put it in the service method, which I assumed was the problem. ie `DescribePipelineOptions(DescribePipelineOptionsRequest(), timeout=...)` should be `DescribePipelineOptions(DescribePipelineOptionsRequest(timeout=...))` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321436) Time Spent: 1h 50m (was: 1h 40m) > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > Labels: portability > Time Spent: 1h 50m > Remaining Estimate: 0h > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?focusedWorklogId=321435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321435 ] ASF GitHub Bot logged work on BEAM-7933: Author: ASF GitHub Bot Created on: 01/Oct/19 18:37 Start Date: 01/Oct/19 18:37 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #9673: [BEAM-7933] Add job server request timeout (default to 60 seconds) URL: https://github.com/apache/beam/pull/9673#discussion_r330206095 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -251,7 +253,11 @@ def send_options_request(max_retries=5): # This reports channel is READY but connections may fail # Seems to be only an issue on Mac with port forwardings return job_service.DescribePipelineOptions( - beam_job_api_pb2.DescribePipelineOptionsRequest()) + beam_job_api_pb2.DescribePipelineOptionsRequest(), + timeout=portable_options.job_server_timeout) +except grpc.FutureTimeoutError: + # no retry for timeout errors Review comment: Okay. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321435) Time Spent: 1h 50m (was: 1h 40m) > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > Labels: portability > Time Spent: 1h 50m > Remaining Estimate: 0h > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?focusedWorklogId=321437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321437 ] ASF GitHub Bot logged work on BEAM-7933: Author: ASF GitHub Bot Created on: 01/Oct/19 18:37 Start Date: 01/Oct/19 18:37 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #9673: [BEAM-7933] Add job server request timeout (default to 60 seconds) URL: https://github.com/apache/beam/pull/9673#discussion_r330211671 ## File path: sdks/python/apache_beam/runners/portability/local_job_service.py ## @@ -169,7 +169,7 @@ def GetMessageStream(self, request, context=None): resp = beam_job_api_pb2.JobMessagesResponse(message_response=msg) yield resp - def DescribePipelineOptions(self, request, context=None): + def DescribePipelineOptions(self, request, context=None, timeout=None): Review comment: It doesn't look like the `timeout` argument is actually used -- see other comment below This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321437) Time Spent: 1h 50m (was: 1h 40m) > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > Labels: portability > Time Spent: 1h 50m > Remaining Estimate: 0h > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-8317) SqlTransform doesn't support aggregation over a filter node
[ https://issues.apache.org/jira/browse/BEAM-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirill Kozlov closed BEAM-8317. --- Fix Version/s: Not applicable Assignee: Kirill Kozlov Resolution: Duplicate > SqlTransform doesn't support aggregation over a filter node > --- > > Key: BEAM-8317 > URL: https://issues.apache.org/jira/browse/BEAM-8317 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.15.0 >Reporter: Brian Hulette >Assignee: Kirill Kozlov >Priority: Major > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > For example, the following query fails to translate to a physical plan: > SELECT SUM(f_intValue) FROM PCOLLECTION WHERE f_intValue < 5 GROUP BY > f_intGroupingKey -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321429=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321429 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 18:28 Start Date: 01/Oct/19 18:28 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#discussion_r330205910 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/JavaFilesystemArtifactStagingService.java ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.artifact; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.stream.Stream; +import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc; + +/** + * An {@link ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase} that loads artifacts into a + * Java FileSystem. Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321429) Time Spent: 1h 40m (was: 1.5h) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 1h 40m > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8312) Flink portable pipeline jars do not need to stage artifacts remotely
[ https://issues.apache.org/jira/browse/BEAM-8312?focusedWorklogId=321430=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321430 ] ASF GitHub Bot logged work on BEAM-8312: Author: ASF GitHub Bot Created on: 01/Oct/19 18:28 Start Date: 01/Oct/19 18:28 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9681: [BEAM-8312] ArtifactRetrievalService serving artifacts from jar. URL: https://github.com/apache/beam/pull/9681#discussion_r330205723 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ClassLoaderArtifactRetrievalService.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.artifact; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An {@link ArtifactRetrievalService} that loads artifacts as {@link ClassLoader} resources. + * + * The retrieval token should be a path to a JSON-formatted ProxyManifest accessible vai {@link + * ClassLoader#getResource(String)} whose resource locations also point to paths loadable via {@link + * ClassLoader#getResource(String)}. + */ +public class ClassLoaderArtifactRetrievalService extends AbstractArtifactRetrievalService { + + private final ClassLoader classLoader; + + public ClassLoaderArtifactRetrievalService() { +this(ClassLoaderArtifactRetrievalService.class.getClassLoader()); + } + + public ClassLoaderArtifactRetrievalService(ClassLoader classLoader) { +this.classLoader = classLoader; + } + + @Override + public InputStream openManifest(String retrievalToken) throws IOException { +return openUri(retrievalToken, retrievalToken); + } + + @Override + public InputStream openUri(String retrievalToken, String uri) throws IOException { +if (uri.charAt(0) == '/') { Review comment: Yeah, the leading slash was (strangely) giving me errors. It is resolved relative to the root on the classloader, so this won't hurt. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321430) Time Spent: 1h 50m (was: 1h 40m) > Flink portable pipeline jars do not need to stage artifacts remotely > > > Key: BEAM-8312 > URL: https://issues.apache.org/jira/browse/BEAM-8312 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Time Spent: 1h 50m > Remaining Estimate: 0h > > Currently, Flink job jars re-stage all artifacts at runtime (on the > JobManager) by using the usual BeamFileSystemArtifactRetrievalService [1]. > However, since the manifest and all the artifacts live on the classpath of > the jar, and everything from the classpath is copied to the Flink workers > anyway [2], it should not be necessary to do additional artifact staging. We > could replace BeamFileSystemArtifactRetrievalService in this case with a > simple ArtifactRetrievalService that just pulls the artifacts from the > classpath. > > [1] > [https://github.com/apache/beam/blob/340c3202b1e5824b959f5f9f626e4c7c7842a3cb/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java] > [2] > [https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L93] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321423=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321423 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 18:20 Start Date: 01/Oct/19 18:20 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330203922 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321420=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321420 ] ASF GitHub Bot logged work on BEAM-7760: Author: ASF GitHub Bot Created on: 01/Oct/19 18:17 Start Date: 01/Oct/19 18:17 Worklog Time Spent: 10m Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module URL: https://github.com/apache/beam/pull/9619#discussion_r330202786 ## File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py ## @@ -0,0 +1,470 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to instrument interactivity to the given pipeline. + +For internal use only; no backwards-compatibility guarantees. +This module accesses current interactive environment and analyzes given pipeline +to transform original pipeline into a one-shot pipeline with interactivity. +""" +from __future__ import absolute_import + +import logging + +import apache_beam as beam +from apache_beam.pipeline import PipelineVisitor +from apache_beam.runners.interactive import cache_manager as cache +from apache_beam.runners.interactive import interactive_environment as ie + +READ_CACHE = "_ReadCache_" +WRITE_CACHE = "_WriteCache_" + + +class PipelineInstrument(object): + """A pipeline instrument for pipeline to be executed by interactive runner. + + This module should never depend on underlying runner that interactive runner + delegates. It instruments the original instance of pipeline directly by + appending or replacing transforms with help of cache. It provides + interfaces to recover states of original pipeline. It's the interactive + runner's responsibility to coordinate supported underlying runners to run + the pipeline instrumented and recover the original pipeline states if needed. + """ + + def __init__(self, pipeline, options=None): +self._pipeline = pipeline +# The cache manager should be initiated outside of this module and outside +# of run_pipeline() from interactive runner so that its lifespan could cover +# multiple runs in the interactive environment. Owned by +# interactive_environment module. Not owned by this module. +# TODO(BEAM-7760): change the scope of cache to be owned by runner or +# pipeline result instances because a pipeline is not 1:1 correlated to a +# running job. Only complete and read-only cache is valid across multiple +# jobs. Other cache instances should have their own scopes. Some design +# change should support only runner.run(pipeline) pattern rather than +# pipeline.run([runner]) and a runner can only run at most one pipeline at a +# time. Otherwise, result returned by run() is the only 1:1 anchor. +self._cache_manager = ie.current_env().cache_manager() + +# Invoke a round trip through the runner API. This makes sure the Pipeline +# proto is stable. The snapshot of pipeline will not be mutated within this +# module and can be used to recover original pipeline if needed. +self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( +pipeline.to_runner_api(use_fake_coders=True), +pipeline.runner, +options) +# Snapshot of original pipeline information. +(self._original_pipeline_proto, + self._original_context) = self._pipeline_snap.to_runner_api( + return_context=True, use_fake_coders=True) + +# All compute-once-against-original-pipeline fields. +self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) +# TODO(BEAM-7760): once cache scope changed, this is not needed to manage +# relationships across pipelines, runners, and jobs. +self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, + self._original_context) + +# A mapping from PCollection id to python id() value in user defined +# pipeline instance. +(self._pcoll_version_map, + self._cacheables) = cacheables(self.pcolls_to_pcoll_id()) + +# A dict from cache key to PCollection that is read from cache. +# If
[jira] [Closed] (BEAM-6896) Beam Dependency Update Request: PyYAML
[ https://issues.apache.org/jira/browse/BEAM-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Bradshaw closed BEAM-6896. - Resolution: Fixed > Beam Dependency Update Request: PyYAML > -- > > Key: BEAM-6896 > URL: https://issues.apache.org/jira/browse/BEAM-6896 > Project: Beam > Issue Type: Bug > Components: dependencies >Reporter: Beam JIRA Bot >Assignee: Robert Bradshaw >Priority: Major > Fix For: 2.17.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > - 2019-03-25 04:17:47.501359 > - > Please consider upgrading the dependency PyYAML. > The current version is 3.13. The latest version is 5.1 > cc: > Please refer to [Beam Dependency Guide > |https://beam.apache.org/contribute/dependencies/]for more information. > Do Not Modify The Description Above. -- This message was sent by Atlassian Jira (v8.3.4#803005)