I have Apicurio Schema Registry 3.0.7 installed, and I'm trying to use the following code to publish a version 2 of schema where GROUP = default, ARTIFACT_ID = com.versa.apicurio.confluent.Employee
here is the code import requests import json import urllib.parse # ───────────────────────────────────────────────────────────────────────────── # CONFIGURATION – update these values! # ───────────────────────────────────────────────────────────────────────────── KEYCLOAK_URL = "https://keycloak.vkp.versa-vani.com/realms/readonly-realm" CLIENT_ID = "apicurio-registry" CLIENT_SECRET = "<secret>" # ← your client secret here REGISTRY_URL = "https://apicurio-sr.vkp.versa-vani.com/apis/registry/v3" GROUP = "default" ARTIFACT_ID = "com.versa.apicurio.confluent.Employee" # ───────────────────────────────────────────────────────────────────────────── def get_token(): """ Use client_credentials grant to obtain a service-account token. """ token_url = f"{KEYCLOAK_URL}/protocol/openid-connect/token" data = { "grant_type": "client_credentials" } # Use HTTP Basic auth with client_id:client_secret resp = requests.post(token_url, data=data, auth=(CLIENT_ID, CLIENT_SECRET)) resp.raise_for_status() return resp.json()["access_token"] def publish_schema(token): """ Publish (register) an Avro schema under GROUP/artifactId. If artifact exists → create new version. If not → create artifact first time. """ url_check = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}" headers = { "Authorization": f"Bearer {token}" } check_response = requests.get(url_check, headers=headers) print(f" check_response.status_code : {check_response.status_code}") schema = { "type": "record", "name": "Employee", "namespace": "com.versa.apicurio.confluent", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "salary", "type": ["null", "float"], "default": None}, {"name": "age", "type": ["null", "int"], "default": None}, { "name": "department", "type": { "type": "enum", "name": "DepartmentEnum", "symbols": ["HR", "ENGINEERING", "SALES"] } }, {"name": "email", "type": ["null", "string"], "default": None}, {"name": "new_col", "type": ["null", "string"], "default": None}, {"name": "new_col2", "type": ["null", "string"], "default": None}, {"name": "new_col3", "type": ["null", "string"], "default": None}, ] } schema_str = json.dumps(schema) # If artifact exists, upload a new version if check_response.status_code == 200: print("Artifact exists, uploading a new version...") url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}/versions" headers_publish = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # For version updates in Apicurio 3.0.7, send the schema directly as content resp = requests.post( url_publish, headers=headers_publish, data=schema_str # Send the schema JSON as the request body ) print("Publish response:", resp.status_code, resp.text) elif check_response.status_code == 404: print("Artifact not found, creating new artifact...") url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts" # For creating a new artifact headers_create = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # Add X-Registry-ArtifactId header for the new artifact headers_create["X-Registry-ArtifactId"] = ARTIFACT_ID # Add X-Registry-ArtifactType header headers_create["X-Registry-ArtifactType"] = "AVRO" # Send the schema directly as the request body resp = requests.post( url_publish, headers=headers_create, data=schema_str # Send the schema JSON as the request body ) else: print(f"Unexpected error while checking artifact: {check_response.status_code} - {check_response.text}") check_response.raise_for_status() return print("Publish response:", resp.status_code, resp.text) if resp.ok: print("Schema published successfully!") if 'globalId' in resp.json(): print("Registered globalId:", resp.json().get("globalId")) else: print("Error publishing schema:", resp.status_code, resp.text) resp.raise_for_status() def get_schemas_and_versions(token, artifact_id): """ List all artifacts and their versions with globalIds in the specified GROUP. For each version, print the schema content. """ url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json" } resp = requests.get(url, headers=headers) print("Get schemas response:", resp.status_code) resp.raise_for_status() # Print the raw response for debugging print("Raw artifacts response:", json.dumps(resp.json(), indent=2)) # Handle both possible response formats artifacts_data = resp.json() print(" type(resp.json()) -> ", type(resp.json())) print(f" artifacts_data : {artifacts_data}") if isinstance(artifacts_data, dict): artifacts = artifacts_data.get("artifacts", []) elif isinstance(artifacts_data, list): artifacts = artifacts_data else: print("Unexpected response format for artifacts list!") return print(f"Found {len(artifacts)} artifacts in group `{GROUP}`:") for art in artifacts: print(f" - Artifact: {art}") # Handle both dict and string artifact representations if isinstance(art, dict): artifact_id_resp = art.get('artifactId') created_by = art.get('createdBy', '<unknown>') else: artifact_id_resp = art created_by = '<unknown>' print(f" artifactId : {artifact_id_resp}, created_by : {created_by}") if not artifact_id_resp: print(f" - Skipping artifact with missing ID: {art}") continue if artifact_id != artifact_id_resp: continue print(f" - Artifact ID: {artifact_id_resp}, createdBy: {created_by}") # Get all versions for this artifact versions_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions" versions_resp = requests.get(versions_url, headers=headers) print("Get versions response:", versions_resp.status_code) if versions_resp.status_code == 200: versions_data = versions_resp.json().get('versions', []) print(f"Found {len(versions_data)} versions for artifact `{artifact_id_resp}`:") print(f" - Versions data: {versions_data}") version_ids = [v.get('version') for v in versions_data] print(f" Versions: {version_ids}") # For each version, print the globalId and schema content for version in version_ids: # version_meta_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}/meta" version_meta_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}" version_meta_resp = requests.get(version_meta_url, headers=headers) print(" Get version metadata response:", version_meta_resp.status_code) if version_meta_resp.status_code == 200: version_meta = version_meta_resp.json() global_id = version_meta.get('globalId', '<not found>') print(f" - Version: {version}, globalId: {global_id}") # Fetch and print the schema content content_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}/content" content_resp = requests.get(content_url, headers=headers) if content_resp.status_code == 200: print(f" Schema content for version {version}:\n{content_resp.text}") else: print(f" Failed to get schema content for version {version}: {content_resp.status_code} - {content_resp.text}") else: print(f" - Failed to get version {version} metadata: {version_meta_resp.status_code} - {version_meta_resp.text}") else: print(f" Failed to get versions: {versions_resp.status_code} - {versions_resp.text}") return artifacts # ───────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": token = get_token() print("Token acquired, length:", len(token)) print(" toekn : ", token) # Register your Employee schema publish_schema(token) print("calling - get_schemas_and_versions") # List existing schemas get_schemas_and_versions(token, ARTIFACT_ID) # Optionally, list all schemas # get_schemas(token) Note - the first version of the schema gets published successfully, but the 2nd version is failing .. here is the error check_response.status_code : 200 Artifact exists, uploading a new version... Publish response: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"} Publish response: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"} Error publishing schema: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"} Traceback (most recent call last): File "/Users/karanalang/Documents/Technology/apicurio-schema-registry/helm-charts/keycloak/consolidate-tls-admin-confluent-v3.py", line 405, in <module> publish_schema(token) File "/Users/karanalang/Documents/Technology/apicurio-schema-registry/helm-charts/keycloak/consolidate-tls-admin-confluent-v3.py", line 124, in publish_schema resp.raise_for_status() File "/opt/homebrew/lib/python3.9/site-packages/requests/models.py", line 1024, in raise_for_status raise HTTPError(http_error_msg, response=self) requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://apicurio-sr.vkp.versa-vani.com/apis/registry/v3/groups/default/artifacts/com.versa.apicurio.confluent.Employee/version Issue seems to be with this part of the code if check_response.status_code == 200: print("Artifact exists, uploading a new version...") url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}/versions" headers_publish = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # For version updates in Apicurio 3.0.7, send the schema directly as content resp = requests.post( url_publish, headers=headers_publish, data=schema_str # Send the schema JSON as the request body ) print("Publish response:", resp.status_code, resp.text) How do i debug/fix this ? tia!