This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 4aebbc0391c [Playground] Log cancellation messages as warnings (#26790) 4aebbc0391c is described below commit 4aebbc0391c9e03e7bfdfd58f80986314c33ed18 Author: Timur Sultanov <timur.sulta...@akvelon.com> AuthorDate: Fri May 26 20:44:04 2023 +0400 [Playground] Log cancellation messages as warnings (#26790) * Log cancellation messages as warnings * Log compilation errors as warnings --- playground/backend/cmd/server/controller.go | 4 +- .../internal/code_processing/code_processing.go | 63 +++++++++++++++------- .../backend/internal/errors/lifecycle_error.go | 32 +++++++++++ 3 files changed, 78 insertions(+), 21 deletions(-) diff --git a/playground/backend/cmd/server/controller.go b/playground/backend/cmd/server/controller.go index 4e379be2595..606e5fd4dcf 100644 --- a/playground/backend/cmd/server/controller.go +++ b/playground/backend/cmd/server/controller.go @@ -570,7 +570,7 @@ func (controller *playgroundController) GetSnippet(ctx context.Context, info *pb func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.GetMetadataRequest) (*pb.GetMetadataResponse, error) { commitTimestampInteger, err := strconv.ParseInt(BuildCommitTimestamp, 10, 64) if err != nil { - logger.Errorf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error()) + logger.Warnf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error()) commitTimestampInteger = 0 } @@ -587,7 +587,7 @@ func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.Get // verifyRouter verifies that controller is configured to work in router mode func (controller *playgroundController) verifyRouter() error { if controller.env.BeamSdkEnvs.ApacheBeamSdk != pb.Sdk_SDK_UNSPECIFIED { - return errors.New("runner mode") + return errors.New("server is in runner mode") } if controller.db == nil { return errors.New("no database service") diff --git a/playground/backend/internal/code_processing/code_processing.go b/playground/backend/internal/code_processing/code_processing.go index d371860430e..06bc8335701 100644 --- a/playground/backend/internal/code_processing/code_processing.go +++ b/playground/backend/internal/code_processing/code_processing.go @@ -18,6 +18,7 @@ package code_processing import ( "bytes" "context" + "errors" "fmt" "io" "os" @@ -31,7 +32,7 @@ import ( pb "beam.apache.org/playground/backend/internal/api/v1" "beam.apache.org/playground/backend/internal/cache" "beam.apache.org/playground/backend/internal/environment" - "beam.apache.org/playground/backend/internal/errors" + perrors "beam.apache.org/playground/backend/internal/errors" "beam.apache.org/playground/backend/internal/executors" "beam.apache.org/playground/backend/internal/fs_tool" "beam.apache.org/playground/backend/internal/logger" @@ -68,13 +69,23 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl err := validateStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults) if err != nil { - logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error()) + var pipelineCanceledError perrors.PipelineCanceledError + if errors.As(err, &pipelineCanceledError) { + logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error()) + } else { + logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error()) + } return } err = prepareStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults, lc.GetPreparerParameters()) if err != nil { - logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error()) + var pipelineCanceledError perrors.PipelineCanceledError + if errors.As(err, &pipelineCanceledError) { + logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error()) + } else { + logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error()) + } return } @@ -84,14 +95,28 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl err = compileStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest) if err != nil { - logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error()) + var pipelineCanceledError perrors.PipelineCanceledError + var compilationError perrors.CompilationError + if errors.As(err, &pipelineCanceledError) { + logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error()) + } else if errors.As(err, &compilationError) { + logger.Warnf("%s: compilation error: %s", pipelineId, compilationError.Error()) + } else { + logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error()) + } return } // Run/RunTest err = runStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions) if err != nil { - logger.Errorf("%s: error during run step: %s", pipelineId, err.Error()) + var pipelineCanceledError perrors.PipelineCanceledError + if errors.As(err, &pipelineCanceledError) { + logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error()) + } else { + logger.Errorf("%s: error during run step: %s", pipelineId, err.Error()) + } + return } } @@ -206,7 +231,7 @@ func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.L if processingErr != nil { return processingErr } - return err + return perrors.CompilationError{Reason: err.Error()} } // Compile step is finished and code is compiled if err := processCompileSuccess(compileOutput.Bytes(), pipelineId, cacheService); err != nil { return err @@ -320,12 +345,12 @@ func GetProcessingOutput(ctx context.Context, cacheService cache.Cache, key uuid value, err := cacheService.GetValue(ctx, key, subKey) if err != nil { logger.Errorf("%s: GetProcessingOutput(): cache.GetValue: error: %s", key, err.Error()) - return "", errors.NotFoundError(errorTitle, "Error during getting output") + return "", perrors.NotFoundError(errorTitle, "Error during getting output") } stringValue, converted := value.(string) if !converted { logger.Errorf("%s: couldn't convert value to string: %s", key, value) - return "", errors.InternalError(errorTitle, "Error during getting output") + return "", perrors.InternalError(errorTitle, "Error during getting output") } return stringValue, nil } @@ -337,12 +362,12 @@ func GetProcessingStatus(ctx context.Context, cacheService cache.Cache, key uuid value, err := cacheService.GetValue(ctx, key, cache.Status) if err != nil { logger.Errorf("%s: GetProcessingStatus(): cache.GetValue: error: %s", key, err.Error()) - return pb.Status_STATUS_UNSPECIFIED, errors.NotFoundError(errorTitle, "Error during getting status") + return pb.Status_STATUS_UNSPECIFIED, perrors.NotFoundError(errorTitle, "Error during getting status") } statusValue, converted := value.(pb.Status) if !converted { logger.Errorf("%s: couldn't convert value to correct status enum: %s", key, value) - return pb.Status_STATUS_UNSPECIFIED, errors.InternalError(errorTitle, "Error during getting status") + return pb.Status_STATUS_UNSPECIFIED, perrors.InternalError(errorTitle, "Error during getting status") } return statusValue, nil } @@ -354,12 +379,12 @@ func GetLastIndex(ctx context.Context, cacheService cache.Cache, key uuid.UUID, value, err := cacheService.GetValue(ctx, key, subKey) if err != nil { logger.Errorf("%s: GetLastIndex(): cache.GetValue: error: %s", key, err.Error()) - return 0, errors.NotFoundError(errorTitle, "Error during getting pagination value") + return 0, perrors.NotFoundError(errorTitle, "Error during getting pagination value") } convertedValue, converted := value.(float64) if !converted { logger.Errorf("%s: couldn't convert value to float64. value: %s type %s", key, value, reflect.TypeOf(value)) - return 0, errors.InternalError(errorTitle, "Error during getting pagination value") + return 0, perrors.InternalError(errorTitle, "Error during getting pagination value") } return int(convertedValue), nil } @@ -371,12 +396,12 @@ func GetGraph(ctx context.Context, cacheService cache.Cache, key uuid.UUID, erro value, err := cacheService.GetValue(ctx, key, cache.Graph) if err != nil { logger.Errorf("%s: GetGraph(): cache.GetValue: error: %s", key, err.Error()) - return "", errors.NotFoundError(errorTitle, "Error during getting graph") + return "", perrors.NotFoundError(errorTitle, "Error during getting graph") } stringValue, converted := value.(string) if !converted { logger.Errorf("%s: couldn't convert value to string. value: %s type %s", key, value, reflect.TypeOf(value)) - return "", errors.InternalError(errorTitle, "Error during getting graph") + return "", perrors.InternalError(errorTitle, "Error during getting graph") } return stringValue, nil } @@ -414,14 +439,14 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx context.Context, pipelineId uu if err := finishByTimeout(pipelineId, cacheService); err != nil { return false, fmt.Errorf("error during context timeout processing: %s", err.Error()) } - return false, fmt.Errorf("code processing context timeout") + return false, perrors.PipelineCanceledError{Reason: fmt.Sprintf("code processing context timeout")} case context.Canceled: if err := processCancel(cacheService, pipelineId); err != nil { return false, fmt.Errorf("error during cancellation processing: %s", err.Error()) } - return false, fmt.Errorf("code processing was canceled") + return false, perrors.PipelineCanceledError{Reason: "code processing was canceled"} default: - return false, fmt.Errorf("code processing cancelled: %s", contextErr.Error()) + return false, fmt.Errorf("code processing cancelled due to unexpected reason: %s", contextErr.Error()) } case ok := <-successChannel: return ok, nil @@ -554,7 +579,7 @@ func DeleteResources(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) { // finishByTimeout is used in case of runCode method finished by timeout func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error { - logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId) + logger.Warnf("%s: code processing finishes because of timeout\n", pipelineId) // set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT) @@ -562,7 +587,7 @@ func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error { // processErrorWithSavingOutput processes error with saving to cache received error output. func processErrorWithSavingOutput(err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error { - logger.Errorf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput) + logger.Warnf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput) if err := utils.SetToCache(cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil { logger.Errorf("%s: failed to save error message to cache: %s", pipelineId, err.Error()) diff --git a/playground/backend/internal/errors/lifecycle_error.go b/playground/backend/internal/errors/lifecycle_error.go new file mode 100644 index 00000000000..a467bfd1a00 --- /dev/null +++ b/playground/backend/internal/errors/lifecycle_error.go @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errors + +type PipelineCanceledError struct { + Reason string +} + +func (e PipelineCanceledError) Error() string { + return e.Reason +} + +type CompilationError struct { + Reason string +} + +func (e CompilationError) Error() string { + return e.Reason +}