Abacn commented on code in PR #32389:
URL: https://github.com/apache/beam/pull/32389#discussion_r1752272325
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java:
##########
@@ -425,7 +538,7 @@ static Row decodeDelegate(
// in which case we drop the extra fields.
if (encodingPos < coders.length) {
int rowIndex = encodingPosToIndex[encodingPos];
- if (nullFields.get(rowIndex)) {
+ if (nullFields.get(encodingPos)) {
Review Comment:
was this a bug? rowIndex and encodingPos looks different
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java:
##########
@@ -109,30 +113,121 @@ public abstract class RowCoderGenerator {
private static final String CODERS_FIELD_NAME = "FIELD_CODERS";
private static final String POSITIONS_FIELD_NAME =
"FIELD_ENCODING_POSITIONS";
- // Cache for Coder class that are already generated.
- private static final Map<UUID, Coder<Row>> GENERATED_CODERS =
Maps.newConcurrentMap();
- private static final Map<UUID, Map<String, Integer>>
ENCODING_POSITION_OVERRIDES =
+ static class WithStackTrace<T> {
+ private final T value;
+ private final String stackTrace;
+
+ public WithStackTrace(T value, String stackTrace) {
+ this.value = value;
+ this.stackTrace = stackTrace;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public String getStackTrace() {
+ return stackTrace;
+ }
+ }
+
+ // Cache for Coder class that are already generated. Coders are added with
setOverridesLock held.
+ private static final Map<UUID, WithStackTrace<Coder<Row>>> GENERATED_CODERS =
Maps.newConcurrentMap();
+ @GuardedBy("setOverridesLock")
+ private static final Map<UUID, WithStackTrace<Map<String, Integer>>>
ENCODING_POSITION_OVERRIDES =
+ Maps.newHashMap();
+
+ private static final Object setOverridesLock = new Object();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowCoderGenerator.class);
+
+ private static String getStackTrace() {
+ StringBuilder builder = new StringBuilder();
+ for (StackTraceElement e : new Throwable().getStackTrace()) {
+ builder.append(" at ").append(e).append("\n");
+ }
+ return builder.toString();
+ }
+
public static void overrideEncodingPositions(UUID uuid, Map<String, Integer>
encodingPositions) {
- ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions);
+ final String stackTrace = getStackTrace();
+ synchronized (setOverridesLock) {
+ @Nullable
+ WithStackTrace<Map<String, Integer>> previousEncodingPositions =
+ ENCODING_POSITION_OVERRIDES.put(
+ uuid, new WithStackTrace<>(encodingPositions, stackTrace));
+ @Nullable WithStackTrace<Coder<Row>> existingCoder =
GENERATED_CODERS.get(uuid);
+ if (previousEncodingPositions == null) {
+ if (existingCoder != null) {
+ LOG.error(
+ "Received encoding positions for uuid {} too late after creating
RowCoder. Created: {}\n Override: {}",
+ uuid,
+ existingCoder.getStackTrace(),
+ stackTrace);
+ } else {
+ LOG.info("Received encoding positions {} for uuid {}.",
encodingPositions, uuid);
+ }
+ } else if
(!previousEncodingPositions.getValue().equals(encodingPositions)) {
+ if (existingCoder == null) {
+ LOG.error(
+ "Received differing encoding positions for uuid {} before coder
creation. Was {} at {}\n Now {} at {}",
+ uuid,
+ previousEncodingPositions.getValue(),
+ encodingPositions,
+ previousEncodingPositions.getStackTrace(),
+ stackTrace);
+ } else {
+ LOG.error(
+ "Received differing encoding positions for uuid {} after coder
creation at {}\n. "
+ + "Was {} at {}\n Now {} at {}\n",
+ uuid,
+ existingCoder.getStackTrace(),
+ previousEncodingPositions.getValue(),
+ encodingPositions,
+ previousEncodingPositions.getStackTrace(),
+ stackTrace);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ static void clearRowCoderCache() {
+ synchronized (setOverridesLock) {
Review Comment:
GENERATED_CODERS is already a synchronized map. Usually does not need to be
wrapped with synchronized block. Here I see "setOverridesLock" is used in other
places, probably this is the reason. If this is the case, consider adding a
comment to note this?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java:
##########
@@ -109,30 +113,121 @@ public abstract class RowCoderGenerator {
private static final String CODERS_FIELD_NAME = "FIELD_CODERS";
private static final String POSITIONS_FIELD_NAME =
"FIELD_ENCODING_POSITIONS";
- // Cache for Coder class that are already generated.
- private static final Map<UUID, Coder<Row>> GENERATED_CODERS =
Maps.newConcurrentMap();
- private static final Map<UUID, Map<String, Integer>>
ENCODING_POSITION_OVERRIDES =
+ static class WithStackTrace<T> {
+ private final T value;
+ private final String stackTrace;
+
+ public WithStackTrace(T value, String stackTrace) {
+ this.value = value;
+ this.stackTrace = stackTrace;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public String getStackTrace() {
+ return stackTrace;
+ }
+ }
+
+ // Cache for Coder class that are already generated. Coders are added with
setOverridesLock held.
+ private static final Map<UUID, WithStackTrace<Coder<Row>>> GENERATED_CODERS =
Maps.newConcurrentMap();
+ @GuardedBy("setOverridesLock")
+ private static final Map<UUID, WithStackTrace<Map<String, Integer>>>
ENCODING_POSITION_OVERRIDES =
+ Maps.newHashMap();
+
+ private static final Object setOverridesLock = new Object();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowCoderGenerator.class);
+
+ private static String getStackTrace() {
+ StringBuilder builder = new StringBuilder();
+ for (StackTraceElement e : new Throwable().getStackTrace()) {
+ builder.append(" at ").append(e).append("\n");
+ }
+ return builder.toString();
+ }
+
public static void overrideEncodingPositions(UUID uuid, Map<String, Integer>
encodingPositions) {
- ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions);
+ final String stackTrace = getStackTrace();
+ synchronized (setOverridesLock) {
+ @Nullable
+ WithStackTrace<Map<String, Integer>> previousEncodingPositions =
+ ENCODING_POSITION_OVERRIDES.put(
+ uuid, new WithStackTrace<>(encodingPositions, stackTrace));
+ @Nullable WithStackTrace<Coder<Row>> existingCoder =
GENERATED_CODERS.get(uuid);
+ if (previousEncodingPositions == null) {
+ if (existingCoder != null) {
+ LOG.error(
+ "Received encoding positions for uuid {} too late after creating
RowCoder. Created: {}\n Override: {}",
Review Comment:
I also found error/warning log to print stacktrace useful in #31924. However
this sometimes made the logging extremely long and get truncated in Dataflow
logging. For example, here it prints two stacktrace, it could truncate before
the second stacktrace gets to the point of interest.
A helper function
`sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.arrayToNewlines`
is used truncate the stacktrace to some finite line, if find useful here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]