This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aebbc0391c [Playground] Log cancellation messages as warnings (#26790)
4aebbc0391c is described below

commit 4aebbc0391c9e03e7bfdfd58f80986314c33ed18
Author: Timur Sultanov <timur.sulta...@akvelon.com>
AuthorDate: Fri May 26 20:44:04 2023 +0400

    [Playground] Log cancellation messages as warnings (#26790)
    
    * Log cancellation messages as warnings
    
    * Log compilation errors as warnings
---
 playground/backend/cmd/server/controller.go        |  4 +-
 .../internal/code_processing/code_processing.go    | 63 +++++++++++++++-------
 .../backend/internal/errors/lifecycle_error.go     | 32 +++++++++++
 3 files changed, 78 insertions(+), 21 deletions(-)

diff --git a/playground/backend/cmd/server/controller.go 
b/playground/backend/cmd/server/controller.go
index 4e379be2595..606e5fd4dcf 100644
--- a/playground/backend/cmd/server/controller.go
+++ b/playground/backend/cmd/server/controller.go
@@ -570,7 +570,7 @@ func (controller *playgroundController) GetSnippet(ctx 
context.Context, info *pb
 func (controller *playgroundController) GetMetadata(_ context.Context, _ 
*pb.GetMetadataRequest) (*pb.GetMetadataResponse, error) {
        commitTimestampInteger, err := strconv.ParseInt(BuildCommitTimestamp, 
10, 64)
        if err != nil {
-               logger.Errorf("GetMetadata(): failed to parse 
BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
+               logger.Warnf("GetMetadata(): failed to parse 
BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
                commitTimestampInteger = 0
        }
 
@@ -587,7 +587,7 @@ func (controller *playgroundController) GetMetadata(_ 
context.Context, _ *pb.Get
 // verifyRouter verifies that controller is configured to work in router mode
 func (controller *playgroundController) verifyRouter() error {
        if controller.env.BeamSdkEnvs.ApacheBeamSdk != pb.Sdk_SDK_UNSPECIFIED {
-               return errors.New("runner mode")
+               return errors.New("server is in runner mode")
        }
        if controller.db == nil {
                return errors.New("no database service")
diff --git a/playground/backend/internal/code_processing/code_processing.go 
b/playground/backend/internal/code_processing/code_processing.go
index d371860430e..06bc8335701 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -18,6 +18,7 @@ package code_processing
 import (
        "bytes"
        "context"
+       "errors"
        "fmt"
        "io"
        "os"
@@ -31,7 +32,7 @@ import (
        pb "beam.apache.org/playground/backend/internal/api/v1"
        "beam.apache.org/playground/backend/internal/cache"
        "beam.apache.org/playground/backend/internal/environment"
-       "beam.apache.org/playground/backend/internal/errors"
+       perrors "beam.apache.org/playground/backend/internal/errors"
        "beam.apache.org/playground/backend/internal/executors"
        "beam.apache.org/playground/backend/internal/fs_tool"
        "beam.apache.org/playground/backend/internal/logger"
@@ -68,13 +69,23 @@ func Process(ctx context.Context, cacheService cache.Cache, 
lc *fs_tool.LifeCycl
 
        err := validateStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, 
pipelineId, sdkEnv, &validationResults)
        if err != nil {
-               logger.Errorf("%s: error during validation step: %s", 
pipelineId, err.Error())
+               var pipelineCanceledError perrors.PipelineCanceledError
+               if errors.As(err, &pipelineCanceledError) {
+                       logger.Warnf("%s: pipeline execution has been canceled: 
%s", pipelineId, pipelineCanceledError.Error())
+               } else {
+                       logger.Errorf("%s: error during validation step: %s", 
pipelineId, err.Error())
+               }
                return
        }
 
        err = prepareStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, 
pipelineId, sdkEnv, &validationResults, lc.GetPreparerParameters())
        if err != nil {
-               logger.Errorf("%s: error during preparation step: %s", 
pipelineId, err.Error())
+               var pipelineCanceledError perrors.PipelineCanceledError
+               if errors.As(err, &pipelineCanceledError) {
+                       logger.Warnf("%s: pipeline execution has been canceled: 
%s", pipelineId, pipelineCanceledError.Error())
+               } else {
+                       logger.Errorf("%s: error during preparation step: %s", 
pipelineId, err.Error())
+               }
                return
        }
 
@@ -84,14 +95,28 @@ func Process(ctx context.Context, cacheService cache.Cache, 
lc *fs_tool.LifeCycl
 
        err = compileStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, 
pipelineId, sdkEnv, isUnitTest)
        if err != nil {
-               logger.Errorf("%s: error during compilation step: %s", 
pipelineId, err.Error())
+               var pipelineCanceledError perrors.PipelineCanceledError
+               var compilationError perrors.CompilationError
+               if errors.As(err, &pipelineCanceledError) {
+                       logger.Warnf("%s: pipeline execution has been canceled: 
%s", pipelineId, pipelineCanceledError.Error())
+               } else if errors.As(err, &compilationError) {
+                       logger.Warnf("%s: compilation error: %s", pipelineId, 
compilationError.Error())
+               } else {
+                       logger.Errorf("%s: error during compilation step: %s", 
pipelineId, err.Error())
+               }
                return
        }
 
        // Run/RunTest
        err = runStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, 
pipelineId, isUnitTest, sdkEnv, pipelineOptions)
        if err != nil {
-               logger.Errorf("%s: error during run step: %s", pipelineId, 
err.Error())
+               var pipelineCanceledError perrors.PipelineCanceledError
+               if errors.As(err, &pipelineCanceledError) {
+                       logger.Warnf("%s: pipeline execution has been canceled: 
%s", pipelineId, pipelineCanceledError.Error())
+               } else {
+                       logger.Errorf("%s: error during run step: %s", 
pipelineId, err.Error())
+               }
+               return
        }
 }
 
@@ -206,7 +231,7 @@ func compileStep(ctx context.Context, cacheService 
cache.Cache, paths *fs_tool.L
                        if processingErr != nil {
                                return processingErr
                        }
-                       return err
+                       return perrors.CompilationError{Reason: err.Error()}
                } // Compile step is finished and code is compiled
                if err := processCompileSuccess(compileOutput.Bytes(), 
pipelineId, cacheService); err != nil {
                        return err
@@ -320,12 +345,12 @@ func GetProcessingOutput(ctx context.Context, 
cacheService cache.Cache, key uuid
        value, err := cacheService.GetValue(ctx, key, subKey)
        if err != nil {
                logger.Errorf("%s: GetProcessingOutput(): cache.GetValue: 
error: %s", key, err.Error())
-               return "", errors.NotFoundError(errorTitle, "Error during 
getting output")
+               return "", perrors.NotFoundError(errorTitle, "Error during 
getting output")
        }
        stringValue, converted := value.(string)
        if !converted {
                logger.Errorf("%s: couldn't convert value to string: %s", key, 
value)
-               return "", errors.InternalError(errorTitle, "Error during 
getting output")
+               return "", perrors.InternalError(errorTitle, "Error during 
getting output")
        }
        return stringValue, nil
 }
@@ -337,12 +362,12 @@ func GetProcessingStatus(ctx context.Context, 
cacheService cache.Cache, key uuid
        value, err := cacheService.GetValue(ctx, key, cache.Status)
        if err != nil {
                logger.Errorf("%s: GetProcessingStatus(): cache.GetValue: 
error: %s", key, err.Error())
-               return pb.Status_STATUS_UNSPECIFIED, 
errors.NotFoundError(errorTitle, "Error during getting status")
+               return pb.Status_STATUS_UNSPECIFIED, 
perrors.NotFoundError(errorTitle, "Error during getting status")
        }
        statusValue, converted := value.(pb.Status)
        if !converted {
                logger.Errorf("%s: couldn't convert value to correct status 
enum: %s", key, value)
-               return pb.Status_STATUS_UNSPECIFIED, 
errors.InternalError(errorTitle, "Error during getting status")
+               return pb.Status_STATUS_UNSPECIFIED, 
perrors.InternalError(errorTitle, "Error during getting status")
        }
        return statusValue, nil
 }
@@ -354,12 +379,12 @@ func GetLastIndex(ctx context.Context, cacheService 
cache.Cache, key uuid.UUID,
        value, err := cacheService.GetValue(ctx, key, subKey)
        if err != nil {
                logger.Errorf("%s: GetLastIndex(): cache.GetValue: error: %s", 
key, err.Error())
-               return 0, errors.NotFoundError(errorTitle, "Error during 
getting pagination value")
+               return 0, perrors.NotFoundError(errorTitle, "Error during 
getting pagination value")
        }
        convertedValue, converted := value.(float64)
        if !converted {
                logger.Errorf("%s: couldn't convert value to float64. value: %s 
type %s", key, value, reflect.TypeOf(value))
-               return 0, errors.InternalError(errorTitle, "Error during 
getting pagination value")
+               return 0, perrors.InternalError(errorTitle, "Error during 
getting pagination value")
        }
        return int(convertedValue), nil
 }
@@ -371,12 +396,12 @@ func GetGraph(ctx context.Context, cacheService 
cache.Cache, key uuid.UUID, erro
        value, err := cacheService.GetValue(ctx, key, cache.Graph)
        if err != nil {
                logger.Errorf("%s: GetGraph(): cache.GetValue: error: %s", key, 
err.Error())
-               return "", errors.NotFoundError(errorTitle, "Error during 
getting graph")
+               return "", perrors.NotFoundError(errorTitle, "Error during 
getting graph")
        }
        stringValue, converted := value.(string)
        if !converted {
                logger.Errorf("%s: couldn't convert value to string. value: %s 
type %s", key, value, reflect.TypeOf(value))
-               return "", errors.InternalError(errorTitle, "Error during 
getting graph")
+               return "", perrors.InternalError(errorTitle, "Error during 
getting graph")
        }
        return stringValue, nil
 }
@@ -414,14 +439,14 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx 
context.Context, pipelineId uu
                        if err := finishByTimeout(pipelineId, cacheService); 
err != nil {
                                return false, fmt.Errorf("error during context 
timeout processing: %s", err.Error())
                        }
-                       return false, fmt.Errorf("code processing context 
timeout")
+                       return false, perrors.PipelineCanceledError{Reason: 
fmt.Sprintf("code processing context timeout")}
                case context.Canceled:
                        if err := processCancel(cacheService, pipelineId); err 
!= nil {
                                return false, fmt.Errorf("error during 
cancellation processing: %s", err.Error())
                        }
-                       return false, fmt.Errorf("code processing was canceled")
+                       return false, perrors.PipelineCanceledError{Reason: 
"code processing was canceled"}
                default:
-                       return false, fmt.Errorf("code processing cancelled: 
%s", contextErr.Error())
+                       return false, fmt.Errorf("code processing cancelled due 
to unexpected reason: %s", contextErr.Error())
                }
        case ok := <-successChannel:
                return ok, nil
@@ -554,7 +579,7 @@ func DeleteResources(pipelineId uuid.UUID, lc 
*fs_tool.LifeCycle) {
 
 // finishByTimeout is used in case of runCode method finished by timeout
 func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
-       logger.Errorf("%s: code processing finishes because of timeout\n", 
pipelineId)
+       logger.Warnf("%s: code processing finishes because of timeout\n", 
pipelineId)
 
        // set to cache pipelineId: cache.SubKey_Status: 
Status_STATUS_RUN_TIMEOUT
        return utils.SetToCache(cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_RUN_TIMEOUT)
@@ -562,7 +587,7 @@ func finishByTimeout(pipelineId uuid.UUID, cacheService 
cache.Cache) error {
 
 // processErrorWithSavingOutput processes error with saving to cache received 
error output.
 func processErrorWithSavingOutput(err error, errorOutput []byte, pipelineId 
uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, 
newStatus pb.Status) error {
-       logger.Errorf("%s: %s(): err: %s, output: %s\n", pipelineId, 
errorTitle, err.Error(), errorOutput)
+       logger.Warnf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, 
err.Error(), errorOutput)
 
        if err := utils.SetToCache(cacheService, pipelineId, subKey, 
fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
                logger.Errorf("%s: failed to save error message to cache: %s", 
pipelineId, err.Error())
diff --git a/playground/backend/internal/errors/lifecycle_error.go 
b/playground/backend/internal/errors/lifecycle_error.go
new file mode 100644
index 00000000000..a467bfd1a00
--- /dev/null
+++ b/playground/backend/internal/errors/lifecycle_error.go
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package errors
+
+type PipelineCanceledError struct {
+       Reason string
+}
+
+func (e PipelineCanceledError) Error() string {
+       return e.Reason
+}
+
+type CompilationError struct {
+       Reason string
+}
+
+func (e CompilationError) Error() string {
+       return e.Reason
+}

Reply via email to