skrawcz commented on code in PR #644: URL: https://github.com/apache/burr/pull/644#discussion_r2749872005
########## .claude/skills/burr/examples.md: ########## @@ -0,0 +1,629 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Apache Burr Code Examples + +Common patterns and working examples for building Burr applications. + +## Table of Contents +1. [Simple Counter](#simple-counter) +2. [Basic Chatbot](#basic-chatbot) +3. [Multi-Step Workflow](#multi-step-workflow) +4. [Conditional Branching](#conditional-branching) +5. [Looping with Conditions](#looping-with-conditions) +6. [Error Handling](#error-handling) +7. [Streaming Actions](#streaming-actions) +8. [Parallel Execution](#parallel-execution) +9. [State Persistence](#state-persistence) +10. [RAG Pattern](#rag-pattern) + +--- + +## Simple Counter + +Minimal example showing state updates and transitions. + +```python +from burr.core import action, State, ApplicationBuilder, default, expr + +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + return state.update(counter=state["counter"] + 1) + +@action(reads=["counter"], writes=["result"]) +def finish(state: State) -> State: + return state.update(result=f"Final count: {state['counter']}") + +app = ( + ApplicationBuilder() + .with_actions(increment, finish) + .with_transitions( + ("increment", "increment", expr("counter < 10")), + ("increment", "finish", default) + ) + .with_state(counter=0) + .with_entrypoint("increment") + .build() +) + +_, _, final_state = app.run(halt_after=["finish"]) +print(final_state["result"]) # "Final count: 10" +``` + +## Basic Chatbot + +Classic chatbot pattern with user input and AI response. + +```python +from burr.core import action, State, ApplicationBuilder, default + +@action(reads=[], writes=["chat_history", "prompt"]) +def human_input(state: State, prompt: str) -> State: + """Capture user input.""" + chat_item = {"role": "user", "content": prompt} + return ( + state.update(prompt=prompt) + .append(chat_history=chat_item) + ) + +@action(reads=["chat_history"], writes=["response", "chat_history"]) +def ai_response(state: State) -> State: + """Generate AI response.""" + # Call your LLM here + response = call_llm(state["chat_history"]) + chat_item = {"role": "assistant", "content": response} + return ( + state.update(response=response) + .append(chat_history=chat_item) + ) + +app = ( + ApplicationBuilder() + .with_actions(human_input, ai_response) + .with_transitions( + ("human_input", "ai_response"), + ("ai_response", "human_input") + ) + .with_state(chat_history=[]) + .with_entrypoint("human_input") + .with_tracker("local", project="chatbot") + .build() +) + +# Run one turn of conversation +_, _, state = app.run( + halt_after=["ai_response"], + inputs={"prompt": "Hello, how are you?"} +) +print(state["response"]) +``` + +## Multi-Step Workflow + +Chain multiple actions sequentially. + +```python +@action(reads=["raw_text"], writes=["cleaned_text"]) +def clean_text(state: State) -> State: + """Remove special characters and normalize.""" + cleaned = state["raw_text"].lower().strip() + return state.update(cleaned_text=cleaned) + +@action(reads=["cleaned_text"], writes=["tokens"]) +def tokenize(state: State) -> State: + """Split into tokens.""" + tokens = state["cleaned_text"].split() + return state.update(tokens=tokens) + +@action(reads=["tokens"], writes=["summary"]) +def summarize(state: State) -> State: + """Generate summary.""" + summary = f"Processed {len(state['tokens'])} tokens" + return state.update(summary=summary) + +app = ( + ApplicationBuilder() + .with_actions(clean_text, tokenize, summarize) + .with_transitions( + ("clean_text", "tokenize"), + ("tokenize", "summarize") + ) + .with_state(raw_text=" Hello World! ") + .with_entrypoint("clean_text") + .build() +) + +_, _, final_state = app.run(halt_after=["summarize"]) +``` + +## Conditional Branching + +Route execution based on state values. + +```python +from burr.core import when + +@action(reads=["user_type"], writes=["message"]) +def check_user_type(state: State, user_type: str) -> State: + return state.update(user_type=user_type) + +@action(reads=[], writes=["greeting"]) +def admin_greeting(state: State) -> State: + return state.update(greeting="Welcome, Administrator!") + +@action(reads=[], writes=["greeting"]) +def user_greeting(state: State) -> State: + return state.update(greeting="Welcome, User!") + +@action(reads=[], writes=["greeting"]) +def guest_greeting(state: State) -> State: + return state.update(greeting="Welcome, Guest!") + +app = ( + ApplicationBuilder() + .with_actions( + check_user_type, + admin_greeting, + user_greeting, + guest_greeting + ) + .with_transitions( + ("check_user_type", "admin_greeting", when(user_type="admin")), + ("check_user_type", "user_greeting", when(user_type="user")), + ("check_user_type", "guest_greeting", default) + ) + .with_entrypoint("check_user_type") + .build() +) + +_, _, state = app.run( + halt_after=["admin_greeting", "user_greeting", "guest_greeting"], + inputs={"user_type": "admin"} +) +``` + +## Looping with Conditions + +Implement loops using recursive transitions. + +```python +@action(reads=["items", "processed"], writes=["processed", "current_item"]) +def process_item(state: State) -> State: + """Process next item from list.""" + items = state["items"] + processed_count = state.get("processed", 0) + + current_item = items[processed_count] + # Process the item + result = transform(current_item) + + return state.update( + processed=processed_count + 1, + current_item=result + ) + +@action(reads=["processed"], writes=["done"]) +def finish_processing(state: State) -> State: + return state.update(done=True) + +app = ( + ApplicationBuilder() + .with_actions(process_item, finish_processing) + .with_transitions( + ("process_item", "process_item", expr("processed < len(items)")), + ("process_item", "finish_processing", default) + ) + .with_state(items=["a", "b", "c"], processed=0) + .with_entrypoint("process_item") + .build() +) +``` + +## Error Handling + +Handle errors gracefully by routing to error actions. + +```python +@action(reads=["data"], writes=["result", "error"]) +def risky_operation(state: State) -> State: + """Operation that might fail.""" + try: + result = dangerous_function(state["data"]) + return state.update(result=result, error=None) + except Exception as e: + return state.update(result=None, error=str(e)) + +@action(reads=["result"], writes=["success_message"]) +def handle_success(state: State) -> State: + return state.update(success_message=f"Success: {state['result']}") + +@action(reads=["error"], writes=["error_message"]) +def handle_error(state: State) -> State: + return state.update(error_message=f"Error: {state['error']}") + +@action(reads=["data"], writes=["result", "retry_count"]) +def retry_operation(state: State) -> State: + """Retry the operation.""" + retry_count = state.get("retry_count", 0) + 1 + try: + result = dangerous_function(state["data"]) + return state.update(result=result, error=None, retry_count=retry_count) + except Exception as e: + return state.update(result=None, error=str(e), retry_count=retry_count) + +app = ( + ApplicationBuilder() + .with_actions( + risky_operation, + handle_success, + handle_error, + retry_operation + ) + .with_transitions( + ("risky_operation", "handle_success", when(error=None)), + ("risky_operation", "retry_operation", + expr("error is not None and retry_count < 3")), + ("risky_operation", "handle_error", default), + ("retry_operation", "handle_success", when(error=None)), + ("retry_operation", "retry_operation", + expr("error is not None and retry_count < 3")), + ("retry_operation", "handle_error", default) + ) + .with_state(data="input", retry_count=0) + .with_entrypoint("risky_operation") + .build() +) +``` + +## Streaming Actions + +Stream intermediate results as they're generated. + +```python +from typing import Generator, Tuple + +@action(reads=["prompt"], writes=["response", "chunks"]) +def streaming_llm(state: State) -> Generator[State, None, Tuple[dict, State]]: + """Stream LLM response token by token.""" + chunks = [] + + # Stream tokens from LLM + for token in llm_stream(state["prompt"]): + chunks.append(token) + # Yield intermediate state + yield state.update( + chunks=chunks, + response="".join(chunks) + ) + + # Return final result + final_response = "".join(chunks) + result = {"response": final_response} + return result, state.update(**result) + +app = ( + ApplicationBuilder() + .with_actions(streaming_llm) + .with_state(prompt="Write a story") + .with_entrypoint("streaming_llm") + .build() +) + +# Stream results +for state in app.stream_result(halt_after=["streaming_llm"]): + print(state["response"], end="", flush=True) +``` + +## Parallel Execution + +Execute multiple actions in parallel. + +```python +from burr.core import graph + +@action(reads=["text"], writes=["sentiment"]) +def analyze_sentiment(state: State) -> State: + sentiment = get_sentiment(state["text"]) + return state.update(sentiment=sentiment) + +@action(reads=["text"], writes=["entities"]) +def extract_entities(state: State) -> State: + entities = extract_ner(state["text"]) + return state.update(entities=entities) + +@action(reads=["text"], writes=["keywords"]) +def extract_keywords(state: State) -> State: + keywords = get_keywords(state["text"]) + return state.update(keywords=keywords) + +@action(reads=["sentiment", "entities", "keywords"], writes=["analysis"]) +def combine_results(state: State) -> State: + """Combine all analysis results.""" + analysis = { + "sentiment": state["sentiment"], + "entities": state["entities"], + "keywords": state["keywords"] + } + return state.update(analysis=analysis) + +# Use graph builder for parallel execution +g = ( + graph.GraphBuilder() + .with_actions( + analyze_sentiment, + extract_entities, + extract_keywords, + combine_results + ) + .with_transitions( + # These three run in parallel + ("start", "analyze_sentiment"), + ("start", "extract_entities"), + ("start", "extract_keywords"), Review Comment: this is incorrect... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
