This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 41c3811 Revert "[BEAM-11474] Track transform processing thread in Java SDK harness and set log entry field" new 6af9d75 Merge pull request #13696 from y1chi/revert-13533-test_logging 41c3811 is described below commit 41c3811a88181dd4e8a15d79bc3b3a4abf54bd1b Author: Yichi Zhang <zyi...@google.com> AuthorDate: Thu Jan 7 14:05:54 2021 -0800 Revert "[BEAM-11474] Track transform processing thread in Java SDK harness and set log entry field" --- .../apache/beam/fn/harness/FnApiDoFnRunner.java | 27 ------------ .../harness/TransformProcessingThreadTracker.java | 49 ---------------------- .../fn/harness/logging/BeamFnLoggingClient.java | 8 ---- 3 files changed, 84 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 23b7699..95ca03e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -724,8 +724,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } private void startBundle() { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); // Register as a consumer for each timer. timerHandlers = new HashMap<>(); for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfo : @@ -745,8 +743,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } private void processElementForParDo(WindowedValue<InputT> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem; try { doFnInvoker.invokeProcessElement(processContext); @@ -756,8 +752,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem; try { Iterator<BoundedWindow> windowIterator = @@ -773,8 +767,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } private void processElementForPairWithRestriction(WindowedValue<InputT> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem; try { currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext); @@ -797,8 +789,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } private void processElementForWindowObservingPairWithRestriction(WindowedValue<InputT> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem; try { Iterator<BoundedWindow> windowIterator = @@ -831,8 +821,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator private void processElementForSplitRestriction( WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem.withValue(elem.getValue().getKey()); currentRestriction = elem.getValue().getValue().getKey(); currentWatermarkEstimatorState = elem.getValue().getValue().getValue(); @@ -861,8 +849,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator private void processElementForWindowObservingSplitRestriction( WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem.withValue(elem.getValue().getKey()); currentRestriction = elem.getValue().getValue().getKey(); currentWatermarkEstimatorState = elem.getValue().getValue().getValue(); @@ -897,8 +883,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator private void processElementForTruncateRestriction( WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem.withValue(elem.getValue().getKey().getKey()); currentRestriction = elem.getValue().getKey().getValue().getKey(); currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue(); @@ -931,8 +915,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator private void processElementForWindowObservingTruncateRestriction( WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem.withValue(elem.getValue().getKey().getKey()); try { windowCurrentIndex = -1; @@ -1030,8 +1012,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator private void processElementForWindowObservingSizedElementAndRestriction( WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentElement = elem.withValue(elem.getValue().getKey().getKey()); try { windowCurrentIndex = -1; @@ -1642,8 +1622,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator private <K> void processTimer( String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); currentTimer = timer; currentTimeDomain = timeDomain; // The timerIdOrTimerFamilyId contains either a timerId from timer declaration or timerFamilyId @@ -1671,9 +1649,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } private void finishBundle() throws Exception { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); - for (TimerHandler timerHandler : timerHandlers.values()) { timerHandler.awaitCompletion(); } @@ -1688,8 +1663,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } private void tearDown() { - TransformProcessingThreadTracker.recordProcessingThread( - Thread.currentThread().getId(), this.pTransformId); doFnInvoker.invokeTeardown(); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java deleted file mode 100644 index ae69db3..0000000 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.fn.harness; - -import java.time.Duration; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; - -/** - * TransformProcessingThreadTracker tracks the thread ids for the transforms that are being - * processed in the SDK harness. - */ -public class TransformProcessingThreadTracker { - private static final TransformProcessingThreadTracker INSTANCE = - new TransformProcessingThreadTracker(); - private final Cache<Long, String> threadIdToTransformIdMappings; - - private TransformProcessingThreadTracker() { - this.threadIdToTransformIdMappings = - CacheBuilder.newBuilder().maximumSize(10000).expireAfterAccess(Duration.ofHours(1)).build(); - } - - public static TransformProcessingThreadTracker getInstance() { - return INSTANCE; - } - - public static Cache<Long, String> getThreadIdToTransformIdMappings() { - return getInstance().threadIdToTransformIdMappings; - } - - public static void recordProcessingThread(Long threadId, String transformId) { - getInstance().threadIdToTransformIdMappings.put(threadId, transformId); - } -} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index e3c3bd9..0fdf404 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -40,7 +40,6 @@ import java.util.logging.LogManager; import java.util.logging.LogRecord; import java.util.logging.Logger; import java.util.logging.SimpleFormatter; -import org.apache.beam.fn.harness.TransformProcessingThreadTracker; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; @@ -217,13 +216,6 @@ public class BeamFnLoggingClient implements AutoCloseable { builder.setLogLocation(loggerName); } - String transformId = - TransformProcessingThreadTracker.getThreadIdToTransformIdMappings() - .getIfPresent((long) record.getThreadID()); - if (transformId != null) { - builder.setTransformId(transformId); - } - // The thread that sends log records should never perform a blocking publish and // only insert log records best effort. if (Thread.currentThread() != logEntryHandlerThread) {