scwhittle commented on code in PR #32389:
URL: https://github.com/apache/beam/pull/32389#discussion_r1753769288


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java:
##########
@@ -109,30 +113,121 @@ public abstract class RowCoderGenerator {
   private static final String CODERS_FIELD_NAME = "FIELD_CODERS";
   private static final String POSITIONS_FIELD_NAME = 
"FIELD_ENCODING_POSITIONS";
 
-  // Cache for Coder class that are already generated.
-  private static final Map<UUID, Coder<Row>> GENERATED_CODERS = 
Maps.newConcurrentMap();
-  private static final Map<UUID, Map<String, Integer>> 
ENCODING_POSITION_OVERRIDES =
+  static class WithStackTrace<T> {
+    private final T value;
+    private final String stackTrace;
+
+    public WithStackTrace(T value, String stackTrace) {
+      this.value = value;
+      this.stackTrace = stackTrace;
+    }
+
+    public T getValue() {
+      return value;
+    }
+
+    public String getStackTrace() {
+      return stackTrace;
+    }
+  }
+
+  // Cache for Coder class that are already generated. Coders are added with 
setOverridesLock held.
+  private static final Map<UUID, WithStackTrace<Coder<Row>>> GENERATED_CODERS =
       Maps.newConcurrentMap();
 
+  @GuardedBy("setOverridesLock")
+  private static final Map<UUID, WithStackTrace<Map<String, Integer>>> 
ENCODING_POSITION_OVERRIDES =
+      Maps.newHashMap();
+
+  private static final Object setOverridesLock = new Object();
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowCoderGenerator.class);
+
+  private static String getStackTrace() {
+    StringBuilder builder = new StringBuilder();
+    for (StackTraceElement e : new Throwable().getStackTrace()) {
+      builder.append("  at ").append(e).append("\n");
+    }
+    return builder.toString();
+  }
+
   public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> 
encodingPositions) {
-    ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions);
+    final String stackTrace = getStackTrace();
+    synchronized (setOverridesLock) {
+      @Nullable
+      WithStackTrace<Map<String, Integer>> previousEncodingPositions =
+          ENCODING_POSITION_OVERRIDES.put(
+              uuid, new WithStackTrace<>(encodingPositions, stackTrace));
+      @Nullable WithStackTrace<Coder<Row>> existingCoder = 
GENERATED_CODERS.get(uuid);
+      if (previousEncodingPositions == null) {
+        if (existingCoder != null) {
+          LOG.error(
+              "Received encoding positions for uuid {} too late after creating 
RowCoder. Created: {}\n Override: {}",

Review Comment:
   Thanks! done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to