kennknowles commented on code in PR #36962:
URL: https://github.com/apache/beam/pull/36962#discussion_r2982303498
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -633,7 +633,8 @@ class BeamModulePlugin implements Plugin<Project> {
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with:
google_cloud_platform_libraries_bom
def netty_version = "4.1.124.Final"
// [bomupgrader] determined by: io.opentelemetry:opentelemetry-sdk,
consistent with: google_cloud_platform_libraries_bom
- def opentelemetry_version = "1.51.0"
+ def opentelemetry_sdk_version = "1.56.0"
Review Comment:
Can we bump the version in its own commit that includes just the necessary
changes?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/OpenTelemetryContextPropagator.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.context.propagation.TextMapSetter;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+class OpenTelemetryContextPropagator {
+
+ private static final
TextMapSetter<BeamFnApi.Elements.ElementMetadata.Builder> SETTER =
Review Comment:
I feel like errorprone is going to tell you to make this a static method and
use a method reference.
##########
sdks/java/container/license_scripts/dep_urls_java.yaml:
##########
@@ -66,12 +66,12 @@ org.eclipse.jgit:
license: "https://www.eclipse.org/org/documents/edl-v10.html"
type: "Eclipse Distribution License - v1.0"
opentelemetry-bom:
- '1.51.0':
- license:
"https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.51.0/LICENSE"
+ '1.56.0':
Review Comment:
ditto
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java:
##########
@@ -49,6 +50,9 @@ public interface WindowedValue<T> {
@Nullable
String getRecordId();
+ @Nullable
+ Context getOpenTelemetryContext();
Review Comment:
Good question. Mostly default implementation is to allow backwards
compatibility. But in this case, not:
- having the IDE/compiler find all the places we need to think about this
is actually a good thing
- users should not be implementing this, so breaking them is fine/good
For testing places where people _really_ want a default implementation, we
could make a class like `DefaultEmptyWindowedValue` that has this override to
`null`. Then places where someone is writing a test or wants to save
boilerplate they have to opt-in to extending `DefaultEmptyWindowedValue`.
I actually think the number of places for this is pretty low, so I would not
bother with any of that.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java:
##########
@@ -132,29 +150,32 @@ public void encode(ValueInSingleWindow<T> windowedElem,
OutputStream outStream,
@Override
public ValueInSingleWindow<T> decode(InputStream inStream) throws
IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, Coder.Context.NESTED);
}
@Override
@SuppressWarnings("IgnoredPureGetter")
- public ValueInSingleWindow<T> decode(InputStream inStream, Context
context) throws IOException {
+ public ValueInSingleWindow<T> decode(InputStream inStream, Coder.Context
context)
+ throws IOException {
Instant timestamp = InstantCoder.of().decode(inStream);
BoundedWindow window = windowCoder.decode(inStream);
PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
CausedByDrain causedByDrain = CausedByDrain.NORMAL;
+ io.opentelemetry.context.@Nullable Context openTelemetryContext = null;
Review Comment:
It looks even weirder when it is an inner class `(OuterClass.@Nullable
InnerClass)`. Not only is it conventional, it is required. (there is no other
valid syntax for this)
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java:
##########
@@ -123,6 +135,12 @@ public void encode(ValueInSingleWindow<T> windowedElem,
OutputStream outStream,
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
// todo #33176 specify additional metadata in the future
+ io.opentelemetry.context.Context openTelemetryContext =
+ windowedElem.getOpenTelemetryContext();
+ if (openTelemetryContext != null) {
+
+ OpenTelemetryContextPropagator.write(openTelemetryContext, builder);
Review Comment:
I don't like `write` as the name, because it tends to imply IO. I think you
are actually doing a `set`.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/OpenTelemetryContextPropagator.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.context.propagation.TextMapSetter;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+class OpenTelemetryContextPropagator {
+
+ private static final
TextMapSetter<BeamFnApi.Elements.ElementMetadata.Builder> SETTER =
+ (carrier, key, value) -> {
+ if (carrier == null) {
+ return;
+ }
+ if ("traceparent".equals(key)) {
+ carrier.setTraceparent(value);
+ } else if ("tracestate".equals(key)) {
+ carrier.setTracestate(value);
+ }
+ };
+
+ private static final TextMapGetter<BeamFnApi.Elements.ElementMetadata>
GETTER =
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]