Orchestrate a full ETL pipeline, inserting z/OS VSAM KSDS records into an AWS RDS Postgres instance using Grace.
This tutorial will guide you through using Grace to orchestrate a full ETL pipeline - extracting VSAM KSDS records from z/OS, running transformations, and inserting into a PostgreSQL instance hosted via AWS RDS.
You will learn how to:
Write and compile a COBOL transformation execute job to convert a VSAM dataset into a sequential text dataset.
Utilize Grace's shell job module to run a Python CSV transformation script.
Utilize Grace's shell job module to batch insert CSV records into a PostgreSQL instance in the cloud.
This is a common scenario for exposing mainframe records to a user-facing application.
You will configure a grace.yml file that defines a multi-step pipeline. This pipeline will:
Extract (E): Execute a COBOL program on z/OS to read records from a VSAM Key-Sequenced Data Set (KSDS) and write them out to a sequential text file (e.g., fixed-width or simple character-separated).
Transform (T): Use a Python script (via a shell job) to:
Parse the text records.
Convert them into a standard Comma-Separated Values (CSV) format.
Perform any necessary data cleaning or simple transformations.
Load (L): Use another Python script (via a shell job) to:
Read the processed CSV file.
Connect to an AWS RDS PostgreSQL database instance.
Insert the records from the CSV into a PostgreSQL table.
This demonstrates a hybrid workflow where Grace seamlessly orchestrates tasks across both the mainframe and your local/cloud environment.
Allocate PDS datasets for JCL, source code, and load modules (e.g., YOURHLQ.GRACE.JCL, YOURHLQ.GRACE.SOURCE, YOURHLQ.GRACE.LOADLIB).
COBOL Compiler: Access to an IBM Enterprise COBOL compiler (or compatible) on your z/OS system. You'll need to know the DSN of its load library for the STEPLIB.
Sample VSAM KSDS: You'll need a VSAM KSDS file to extract data from. Later in this tutorial, we'll provide sample JCL to create and populate one if you don't have a suitable test file.
Network Accessibility: Ensure the RDS instance's security group allows inbound connections on the PostgreSQL port (default 5432) from the IP address of the machine where you will be running grace (and therefore the Python loading script).
Database Credentials: Note down the following for your RDS instance:
Endpoint (hostname)
Port (e.g., 5432)
Database name
Master username (or a dedicated user with insert/create table permissions)
Master password (or the dedicated user's password)
⚠️ For simplicity, this tutorial will use a .env file to provide these as environment variables to the Python script. In a production scenario, you may want to use more secure methods like AWS Secrets Manager or IAM database authentication.
Local Development Environment:
Python 3: Installed on your machine.
psycopg2 package: The PostgreSQL adapter for Python. Install it using pip:
pip install psycopg2-binary
(Optional) AWS CLI: While not strictly required if you manually manage credentials, having the AWS CLI installed and configured can be helpful for AWS interactions in more advanced scenarios (e.g., fetching secrets).
Once these prerequisites are in place, you're ready to start building your Grace workflow.
Follow the interactive prompts to provide a workflow name, high-level qualifier, and your Zowe profile name of choice. You can always edit these later in grace.yml. For this tutorial, let's assume:
HLQ: YOURHLQ (Replace this with your actual HLQ)
Profile: myzos (Replace this with your working Zowe profile)
See Configuration if you haven't already configured Zowe for z/OS access and authentication.
If you don't have a suitable VSAM KSDS for testing, you can use the following JCL to define and load a small sample.
Create a file named initvsam.jcl with the following content.
⚠️ Remember to replace YOURHLQ with your actual high-level qualifier, and VOLSER with a suitable volume serial for new dataset allocations on your system.
//INITVSAM JOB (ACCT),'DEFINE & LOAD VSAM',CLASS=A,MSGCLASS=X,MSGLEVEL=(1,1)//********************************************************************//* DELETE ANY PREVIOUS VERSIONS (IGNORE RC IF NOT FOUND)//********************************************************************//DELVSAM EXEC PGM=IDCAMS//SYSPRINT DD SYSOUT=*//SYSIN DD *DELETE YOURHLQ.GRACE.VSAMDEMO.KSDS CLUSTER PURGESET MAXCC=0/*//********************************************************************//* DEFINE THE VSAM KSDS CLUSTER//********************************************************************//DEFVSAM EXEC PGM=IDCAMS//SYSPRINT DD SYSOUT=*//SYSIN DD *DEFINE CLUSTER (NAME(YOURHLQ.GRACE.VSAMDEMO.KSDS) -INDEXED -RECORDS(100 50) - /* PRIMARY, SECONDARY /FREESPACE(10 10) -KEYS(5 0) - / KEY IS 5 BYTES AT OFFSET 0 /RECORDSIZE(80 80) - / AVERAGE, MAXIMUM /SHAREOPTIONS(2 3) -VOLUMES(VOLSER) ) - / REPLACE WITH YOUR DASD VOLSER /DATA (NAME(YOURHLQ.GRACE.VSAMDEMO.KSDS.DATA) -CONTROLINTERVALSIZE(4096)) -INDEX (NAME(YOURHLQ.GRACE.VSAMDEMO.KSDS.INDEX) -CONTROLINTERVALSIZE(1024))///********************************************************************//* LOAD SAMPLE RECORDS INTO THE VSAM KSDS//********************************************************************//LOADVSAM EXEC PGM=IDCAMS//SYSPRINT DD SYSOUT=*//VSAMOUT DD DSN=YOURHLQ.GRACE.VSAMDEMO.KSDS,DISP=OLD//SYSIN DD *REPRO INFILE(INDATA) OUTDATASET(YOURHLQ.GRACE.VSAMDEMO.KSDS)/*//INDATA DD ,DLM='@@'00001John Doe Engineer New York00002Jane Smith Analyst Chicago00003Peter Jones Manager London00004Alice Williams Developer San Francisco00005Bob Brown Architect New York00006Carol White Tester Chicago00007David Green Support London00008Eve Black Admin San Francisco@@/
Submit this JCL to your z/OS system and ensure it completes with MAXCC=0.
Next, we'll write a COBOL program that reads this VSAM KSDS and writes selected fields to a sequential output file in a fixed-width text format.
Inside your Grace project's src/ directory (vsam-postgres-etl/src/), create a file named vsamextr.cbl with the following content:
IDENTIFICATION DIVISION. PROGRAM-ID. VSAMEXTR. AUTHOR. Grace Trainee. *----------------------------------------------------------------* * This program reads a VSAM KSDS and writes selected fields * to a sequential output file. *----------------------------------------------------------------* ENVIRONMENT DIVISION. INPUT-OUTPUT SECTION. FILE-CONTROL. SELECT VSAM-FILE-IN ASSIGN TO VSAMIN ORGANIZATION IS INDEXED ACCESS MODE IS SEQUENTIAL RECORD KEY IS VSAM-IN-EMPL-ID FILE STATUS IS WS-VSAM-STATUS. SELECT TEXT-FILE-OUT ASSIGN TO TEXTOUT ORGANIZATION IS SEQUENTIAL ACCESS MODE IS SEQUENTIAL FILE STATUS IS WS-TEXT-STATUS. DATA DIVISION. FILE SECTION. FD VSAM-FILE-IN RECORD CONTAINS 80 CHARACTERS. 01 VSAM-IN-REC. 05 VSAM-IN-EMPL-ID PIC X(05). 05 VSAM-IN-NAME PIC X(20). 05 VSAM-IN-TITLE PIC X(20). 05 VSAM-IN-LOCATION PIC X(20). 05 FILLER PIC X(15). FD TEXT-FILE-OUT RECORD CONTAINS 65 CHARACTERS. 01 TEXT-OUT-REC PIC X(65). WORKING-STORAGE SECTION. 01 WS-FILE-STATUS-CODES. 05 WS-VSAM-STATUS PIC XX. 88 VSAM-OK VALUE '00'. 88 VSAM-EOF VALUE '10'. 05 WS-TEXT-STATUS PIC XX. 88 TEXT-OK VALUE '00'. 01 WS-OUTPUT-RECORD. 05 WS-OUT-EMPL-ID PIC X(05). 05 WS-OUT-SEPARATOR-1 PIC X(01) VALUE ','. 05 WS-OUT-NAME PIC X(20). 05 WS-OUT-SEPARATOR-2 PIC X(01) VALUE ','. 05 WS-OUT-TITLE PIC X(20). 05 WS-OUT-SEPARATOR-3 PIC X(01) VALUE ','. 05 WS-OUT-LOCATION PIC X(20). 01 WS-FIXED-WIDTH-OUT-REC. 05 FW-OUT-EMPL-ID PIC X(05). 05 FW-OUT-NAME PIC X(20). 05 FW-OUT-TITLE PIC X(20). 05 FW-OUT-LOCATION PIC X(20). 01 WS-END-OF-FILE-SW PIC X VALUE 'N'. 88 EOF-REACHED VALUE 'Y'. PROCEDURE DIVISION. 0000-MAIN-LOGIC. PERFORM 1000-INITIALIZE. PERFORM 2000-PROCESS-RECORDS UNTIL EOF-REACHED. PERFORM 3000-TERMINATE. STOP RUN. 1000-INITIALIZE. OPEN INPUT VSAM-FILE-IN. IF NOT VSAM-OK DISPLAY 'ERROR OPENING VSAM INPUT: ' WS-VSAM-STATUS MOVE 16 TO RETURN-CODE STOP RUN END-IF. OPEN OUTPUT TEXT-FILE-OUT. IF NOT TEXT-OK DISPLAY 'ERROR OPENING TEXT OUTPUT: ' WS-TEXT-STATUS MOVE 16 TO RETURN-CODE STOP RUN END-IF. 2000-PROCESS-RECORDS. READ VSAM-FILE-IN AT END SET EOF-REACHED TO TRUE NOT AT END PERFORM 2100-WRITE-TEXT-RECORD END-READ. 2100-WRITE-TEXT-RECORD. IF VSAM-OK MOVE VSAM-IN-EMPL-ID TO FW-OUT-EMPL-ID MOVE VSAM-IN-NAME TO FW-OUT-NAME MOVE VSAM-IN-TITLE TO FW-OUT-TITLE MOVE VSAM-IN-LOCATION TO FW-OUT-LOCATION MOVE WS-FIXED-WIDTH-OUT-REC TO TEXT-OUT-REC WRITE TEXT-OUT-REC IF NOT TEXT-OK DISPLAY 'ERROR WRITING TEXT RECORD: ' WS-TEXT-STATUS MOVE 16 TO RETURN-CODE STOP RUN END-IF ELSE DISPLAY 'ERROR READING VSAM RECORD: ' WS-VSAM-STATUS MOVE 16 TO RETURN-CODE STOP RUN END-IF. 3000-TERMINATE. CLOSE VSAM-FILE-IN. CLOSE TEXT-FILE-OUT.
This program extracts the fields into a fixed-width format directly matching the TEXT-OUT-REC length of 65 bytes. The Python script in a later step will be responsible for parsing this fixed-width data and converting it into a proper CSV. The record length PIC X(65) for TEXT-OUT-REC is calculated as: 5 (ID) + 20 (Name) + 20 (Title) + 20 (Location) = 65.
Now that we have our sample VSAM KSDS on the mainframe and our COBOL extraction program (vsamextr.cbl) in our local src/ directory, we need to tell Grace how to compile, link, and execute this program.
Open your vsam-postgres-etl/grace.yml file. We'll first set up the config and datasets sections, and then define the jobs.
Edit your grace.yml to look like this (adjust YOURHLQ, YOUR.COBOL.STEPLIB, and YOUR.LOADLIB as needed for your environment):
config: profile: myzos # Replace with your Zowe profile name defaults: compiler: pgm: IGYCRCTL # Or your site's COBOL compiler program name parms: "OBJECT,NODECK,LIB" steplib: "YOUR.COBOL.STEPLIB" # Replace with your COBOL compiler's STEPLIB DSN linker: pgm: IEWL # Or your site's linkage editor program name parms: "LIST,MAP,XREF"datasets: jcl: "YOURHLQ.VSAM2PG.JCL" src: "YOURHLQ.VSAM2PG.SRC" loadlib: "YOURHLQ.LOAD" # An existing load library PDS on z/OSjobs: # Job 1: Compile the COBOL program - name: CMPVSAMX type: compile inputs: - name: SYSIN path: src://vsamextr.cbl outputs: - name: SYSLIN path: zos-temp://vsamextr.obj dcb: "RECFM=FB,LRECL=80,BLKSIZE=3200" # Job 2: Link-edit the object deck into a load module - name: LNKVSAMX type: linkedit program: VSAMEXTR depends_on: [CMPVSAMX] inputs: - name: SYSLIN path: zos-temp://vsamextr.obj # SYSLMOD (output load module) is implicitly datasets.loadlib(VSAMEXTR) # Job 3: Execute the COBOL program to extract VSAM data - name: RUNVSAMX type: execute program: VSAMEXTR # Program to run (from datasets.loadlib) depends_on: [LNKVSAMX] inputs: - name: VSAMIN path: zos://YOURHLQ.GRACE.VSAMDEMO.KSDS disp: SHR outputs: - name: TEXTOUT path: zos-temp://vsam_extract_fixedwidth.txt dcb: "RECFM=FB,LRECL=65,BLKSIZE=27950" space: "(TRK,(5,1),RLSE)"
defaults.compiler.steplib: Crucially, update YOUR.COBOL.STEPLIB to the dataset name of your COBOL compiler's load library (e.g., IGY.V6R4M0.SIGYCOMP). This is essential for the CMPVSAMX job to find the compiler.
Other defaults are typical values but can be adjusted for your site.
We're using YOURHLQ.VSAM2PG.JCL, .SRC, and .LOADLIB.
Replace YOURHLQ with your actual HLQ. Grace will attempt to create these PDSs during grace deck if they don't exist (except for loadlib which grace deck expects to be a pre-allocated PDS or PDSE).
program: VSAMEXTR tells Grace to execute the program we just linked. The default Grace JCL template for execute will include a STEPLIB DD DSN=YOURHLQ.VSAM2PG.LOADLIB,DISP=SHR.
inputs:
VSAMIN: Maps to DDNAME VSAMIN in the COBOL program, pointing to the VSAM KSDS (zos://YOURHLQ.GRACE.VSAMDEMO.KSDS). We use DISP=SHR as we're only reading.
outputs:
TEXTOUT: Maps to DDNAME TEXTOUT in the COBOL program. Grace will create a temporary dataset for zos-temp://vsam_extract_fixedwidth.txt. We've specified DCB to match our COBOL program's output record length (LRECL=65) and a reasonable block size. We also give it some space.
Next steps before running:
Save grace.yml with your environment-specific modifications (HLQ, Zowe profile, compiler STEPLIB).
Ensure your src/vsamextr.cbl file is saved.
Ensure the initvsam.jcl has been run successfully on your mainframe to create YOURHLQ.GRACE.VSAMDEMO.KSDS.
You are now ready to use grace deck to prepare these mainframe artifacts.
After the RUNVSAMX job executes on the mainframe, it will produce a sequential dataset containing the extracted VSAM records in a fixed-width text format (zos-temp://vsam_extract_fixedwidth.txt).
Our next step in the ETL pipeline is to transform this fixed-width file into a standard Comma-Separated Values (CSV) file. This CSV will be easier for our subsequent PostgreSQL loading script to process. We'll use a Python script for this transformation, executed by a Grace shell job.
Create a scripts directory in your Grace project root (vsam-postgres-etl/scripts/)
mkdir scriptscd scripts
Create the Python script transform_to_csv.py
Inside the vsam-postgres-etl/scripts/ directory, create a new file named transform_to_csv.py. Add the following Python code:
import csvimport argparseimport sysdef transform_fixed_width_to_csv(input_filepath, output_filepath): """ Reads a fixed-width file, parses records based on known COBOL layout, and writes them to a CSV file. """ # Define the field widths and names based on our COBOL output record: # FW-OUT-EMPL-ID PIC X(05). # FW-OUT-NAME PIC X(20). # FW-OUT-TITLE PIC X(20). # FW-OUT-LOCATION PIC X(20). # Total LRECL = 65 field_definitions = [ ('employee_id', 5), ('employee_name', 20), ('job_title', 20), ('location', 20) ] header = [field[0] for field in field_definitions] print(f"Starting transformation: {input_filepath} -> {output_filepath}") records_processed = 0 try: with open(input_filepath, 'r', encoding='utf-8') as infile, \ open(output_filepath, 'w', newline='', encoding='utf-8') as outfile: csv_writer = csv.writer(outfile) csv_writer.writerow(header) # Write the header row for line in infile: record = {} current_pos = 0 # Strip trailing newline/carriage return characters which might exist # especially if downloaded from mainframe text files. clean_line = line.rstrip('\n\r') # Ensure line has expected length, otherwise skip or error if len(clean_line) != 65: print(f"Warning: Skipping line of unexpected length ({len(clean_line)}): '{clean_line[:80]}...'", file=sys.stderr) continue for field_name, field_width in field_definitions: value = clean_line[current_pos : current_pos + field_width].strip() record[field_name] = value current_pos += field_width csv_writer.writerow([record[field_name] for field_name in header]) records_processed += 1 except FileNotFoundError: print(f"Error: Input file not found at {input_filepath}", file=sys.stderr) sys.exit(1) except Exception as e: print(f"An error occurred during transformation: {e}", file=sys.stderr) sys.exit(1) print(f"Transformation complete. Processed {records_processed} records.")if __name__ == "__main__": parser = argparse.ArgumentParser(description="Transform fixed-width mainframe extract to CSV.") parser.add_argument("--input", required=True, help="Path to the input fixed-width text file.") parser.add_argument("--output", required=True, help="Path for the output CSV file.") args = parser.parse_args() transform_fixed_width_to_csv(args.input, args.output)
Explanation of transform_to_csv.py:
We use the argparse module to accept input and output file paths as command-line arguments. This makes it easy to pass inputs and capture outputs from within our Grace workflow.
field_definitions: This list defines the structure of our fixed-width input file, based on the WS-FIXED-WIDTH-OUT-REC layout in vsamextr.cbl.
We read the input file line by line.
For each line, the script:
Strips trailing newline characters.
Checks if the line has the expected length (65 characters). Lines with unexpected lengths are skipped with a warning.
Slices the line based on field_definitions to extract each field's value.
It uses Python's csv module to write the header and data rows to the output CSV file, ensuring proper CSV formatting.
Includes basic error handling for file opwerations.
Save the transform_to_csv.py script in the vsam-postgres-etl/scripts/ directory.
You can test this script locally if you manually create a sample fixed-width input file. For example, create sample_fixed.txt:
00001John Doe Engineer New York00002Jane Smith Analyst Chicago
We now have our COBOL program defined in grace.yml to extract data to zos-temp://vsam_extract_fixedwidth.txt, and we have the Python script scripts/transform_to_csv.py ready to convert that fixed-width file into a CSV.
Let's add a shell job to our grace.yml to orchestrate this transformation. This job will:
Depend on the successful completion of the RUNVSAMX (COBOL execution) job.
Take zos-temp://vsam_extract_fixedwidth.txt as an input.
Grace will handle downloading this dataset from the mainframe to a local staging area and provide its path to our script via an environment variable.
Execute the transform_to_csv.py script.
Declare an output local-temp://formatted_data.csv, which our script will create. Grace will manage this temporary local file.
Append the following XFRMCSV job to the jobs array in your grace.yml:
# ... (config, datasets, and previous jobs: CMPVSAMX, LNKVSAMX, RUNVSAMX) ...# Job 4: Transform the extracted fixed-width text file to CSV locally- name: XFRMCSV type: shell depends_on: [RUNVSAMX] # Depends on the COBOL extraction job with: inline: python3 ./scripts/transform_to_csv.py --input "$GRACE_INPUT_VSAMTEXT" --output "$GRACE_OUTPUT_FORMATTEDCSV" inputs: - name: VSAMTEXT path: zos-temp://vsam_extract_fixedwidth.txt # Output from job RUNVSAMX outputs: - name: FORMATTEDCSV path: local-temp://formatted_data.csv
Explanation of the XFRMCSV job:
type: shell identifies this as a job to be run locally by Grace.
depends_on: [RUNVSAMX] ensures this transformation job only runs after the mainframe data extraction is complete and successful.
$GRACE_INPUT_VSAMTEXT: Grace will resolve this to the staging file where zos-temp://vsam_extract_fixedwidth.txt was downloaded.
$GRACE_OUTPUT_FORMATTEDCSV: Grace will set this environment variable to a specific path in its local temporary staging area (because the output path is local-temp://). Our Python script's --output argument will receive this path, and the script will write the resulting CSV there.
Declares that this job consumes the output from the RUNVSAMX job.
Grace will download the content of the z/OS temporary dataset vsam_extract_fixedwidth.txt (whose actual DSN was generated by Grace) to a local temporary file.
The path to this local temporary file will be exposed via the environment variable GRACE_INPUT_VSAMTEXT.
Declares that this job will produce a temporary local file named formatted_data.csv.
Grace will determine a unique local path for this file within its staging area.
This path will be exposed via the environment variable GRACE_OUTPUT_FORMATTEDCSV. The Python script must write its CSV output to this exact path.
This local-temp:// file will be available for downstream jobs and will be cleaned up by Grace at the end of the workflow unless keep: true is set.
While our transform_to_csv.py uses argparse, shell scripts often read environment variables directly. If you prefer not to pass command-line arguments, you could modify the Python script to read os.environ.get('GRACE_INPUT_VSAMTEXT') and os.environ.get('GRACE_OUTPUT_FORMATTEDCSV'). For this tutorial, explicitly passing them via the inline command makes the data flow clear.
With our VSAM data extracted and transformed into a CSV file (local-temp://formatted_data.csv), the next step is to load this CSV data into our PostgreSQL database. We'll create another Python script for this task.
This script will need to:
Read database connection parameters (host, port, database name, user, password). For this tutorial, we'll expect these to be set as environment variables.
In a production environment, your connection parameters might be hosted in a secure credential store like AWS Secrets Manager. In that case, you would access them at runtime within the script using your cloud provider's SDK.
Take the path to the CSV file as a CLI argument.
Connect to the PostgreSQL instance.
Create the target table if it doesn't already exist.
In your vsam-postgres-etl/scripts/ directory, create a new file named load_to_postgres.py.
# scripts/load_to_postgres.pyimport psycopg2import csvimport osimport argparseimport sysdef get_db_connection(): """Establishes a connection to the PostgreSQL database.""" try: conn = psycopg2.connect( host=os.environ.get('PG_HOST'), port=os.environ.get('PG_PORT', '5432'), # Default PostgreSQL port dbname=os.environ.get('PG_DBNAME'), user=os.environ.get('PG_USER'), password=os.environ.get('PG_PASSWORD') ) print("Successfully connected to PostgreSQL database.") return conn except psycopg2.OperationalError as e: print(f"Error: Could not connect to PostgreSQL database: {e}", file=sys.stderr) print("Please ensure PG_HOST, PG_PORT, PG_DBNAME, PG_USER, and PG_PASSWORD environment variables are set correctly.", file=sys.stderr) print("Also, check network connectivity (e.g., security groups in AWS).", file=sys.stderr) sys.exit(1) except Exception as e: print(f"An unexpected error occurred during database connection: {e}", file=sys.stderr) sys.exit(1)def create_table_if_not_exists(conn): """Creates the employee_data table if it doesn't already exist.""" table_creation_sql = """ CREATE TABLE IF NOT EXISTS employee_data ( employee_id VARCHAR(5) PRIMARY KEY, employee_name VARCHAR(50), job_title VARCHAR(50), location VARCHAR(50) ); """ try: with conn.cursor() as cur: cur.execute(table_creation_sql) conn.commit() print("Table 'employee_data' ensured to exist.") except Exception as e: print(f"Error creating table: {e}", file=sys.stderr) conn.rollback() # Rollback in case of error during table creation sys.exit(1)def load_csv_to_postgres(conn, csv_filepath): """Loads data from a CSV file into the employee_data table.""" # For this tutorial, we'll clear the table before loading for idempotency. # In a production scenario, you might use UPSERT or other strategies. clear_table_sql = "DELETE FROM employee_data;" insert_sql = """ INSERT INTO employee_data (employee_id, employee_name, job_title, location) VALUES (%s, %s, %s, %s); """ records_inserted = 0 print(f"Starting data load from: {csv_filepath}") try: with open(csv_filepath, 'r', newline='', encoding='utf-8') as csvfile: csv_reader = csv.DictReader(csvfile) # Reads rows as dictionaries with conn.cursor() as cur: # Clear existing data for this tutorial run cur.execute(clear_table_sql) print(f"Cleared existing data from 'employee_data' table.") for row in csv_reader: # Ensure keys match the CSV header (which should match field_definitions from previous script) cur.execute(insert_sql, ( row['employee_id'], row['employee_name'], row['job_title'], row['location'] )) records_inserted += 1 conn.commit() # Commit all inserts print(f"Successfully loaded {records_inserted} records into 'employee_data' table.") except FileNotFoundError: print(f"Error: CSV input file not found at {csv_filepath}", file=sys.stderr) sys.exit(1) except KeyError as e: print(f"Error: Missing expected column in CSV: {e}. Ensure CSV headers match expected fields (employee_id, employee_name, job_title, location).", file=sys.stderr) sys.exit(1) except Exception as e: print(f"An error occurred during data loading: {e}", file=sys.stderr) conn.rollback() # Rollback on any error during insert sys.exit(1)if __name__ == "__main__": parser = argparse.ArgumentParser(description="Load CSV data into PostgreSQL.") parser.add_argument("--csv_file", required=True, help="Path to the input CSV file.") # DB connection parameters will be read from environment variables by get_db_connection() args = parser.parse_args() db_connection = None try: db_connection = get_db_connection() create_table_if_not_exists(db_connection) load_csv_to_postgres(db_connection, args.csv_file) finally: if db_connection: db_connection.close() print("Database connection closed.")
Retrieves PostgreSQL connection parameters from environment variables (PG_HOST, PG_PORT, PG_DBNAME, PG_USER, PG_PASSWORD). This is a more secure approach than hardcoding or passing them as CLI arguments.
Takes the database connection and the path to the CSV file as input.
For tutorial simplicity, it runs DELETE FROM employee_data; before inserting new data. This ensures that re-running the workflow doesn't just append duplicate records.
In a production scenario, you might implement an "upsert" (update if exists, insert if not) logic or a more sophisticated data synchronization strategy.
Uses csv.DictReader to easily access CSV columns by their header names.
Constructs and executes SQL INSERT statements.
Commits the transaction after all rows are processed.
Includes error handling for file not found, missing CSV columns, and general database errors.
Uses argparse to get the --csv_file path from the command line. This is where we will pass the GRACE_INPUT_* environment variable, pointing to the CSV file produced upstream.
Orchestrates the connection, table creation, and data loading.
Ensures the database connection is closed in a finally block.
We now have the transformed CSV data available as local-temp://formatted_data.csv and the Python script scripts/load_to_postgres.py ready to load this data into our AWS RDS PostgreSQL database.
The final step is to add a shell job to our grace.yml that orchestrates this loading process. This job will:
Depend on the successful completion of the XFRMCSV (CSV transformation) job.
Take local-temp://formatted_data.csv as an input. Grace will provide the local path to this file to our script via environment variable.
Execute the load_to_postgres.py script.
Rely on the database connection environment variables (PG_HOST, PG_USER, etc.) being set in the shell environment.
Append the following LOADDBPG job to the jobs array in your grace.yml:
# ... (config, datasets, and previous jobs: CMPVSAMX, LNKVSAMX, RUNVSAMX, XFRMCSV) ...# Job 5: Load the transformed CSV data into PostgreSQL- name: LOADDBPG type: shell depends_on: [XFRMCSV] # Depends on the CSV transformation job with: inline: | source .env echo "Input CSV file (from GRACE_INPUT_CSVFILE) will be: $GRACE_INPUT_CSVFILE" python3 ./scripts/load_to_postgres.py --csv_file "$GRACE_INPUT_CSVFILE" inputs: - name: CSVFILE # Grace sets env var $GRACE_INPUT_CSVFILE path: local-temp://formatted_data.csv # Output from XFRMCSV
Explanation of the LOADDBPG job:
type: shell indicates a local shell execution job type.
depends_on: [XFRMCSV] ensures this job only runs after the XFRMCSV job successfully creates the formatted_data.csv.
First we need to "deck" our workflow. This command will:
Generate JCL for the CMPVSAMX, LNKVSAMX, and RUNVSAMX jobs.
Upload the COBOL source (vsamextr.cbl) to YOURHLQ.VSAM2PG.SRC(VSAMEXTR), or whatever you set for datasets.src.
Upload the generated JCL for the three mainframe jobs to YOURHLQ.VSAM2PG.JCl, or whatever you set for datasets.jcl.
Ensure the necessary PDS datasets (src, jcl, loadlib) exist, creating them if needed.
In your terminal, from the vsam-postgres-etl project root directory, run:
grace deck
You should see output describing Grace synchronizing your local artifacts to the mainframe.
If any errors occur, review the messages. Common issues at this stage include incorrect Zowe profile configuration, insufficient mainframe permissions for dataset creation/upload, or typos in your grace.yml dataset names or compiler STEPLIB.
Once grace deck completes successfully, all necessary mainframe artifacts are in place. Now, run the entire workflow:
grace run
You will see output in your terminal as Grace executes each job in sequence:
CMPVSAMX: Compiles vsamextr.cbl. You should see it submit JOBxxxxx and wait for a successful return code (e.g., CC 0000 or CC 0004).
LNKVSAMX: Links the object deck into YOURHLQ.VSAM2PG.LOADLIB(VSAMEXTR).
RUNVSAMX: Executes VSAMEXTR. This job reads YOURHLQ.GRACE.VSAMDEMO.KSDS and writes to the temporary dataset Grace creates for zos-temp://vsam_extract_fixedwidth.txt.
XFRMCSV:
Grace downloads the content of zos-temp://vsam_extract_fixedwidth.txt to a local staging file.
The ShellHandler executes python3 ./scripts/transform_to_csv.py --input <path_to_downloaded_file> --output <path_to_local_temp_csv>.
The output CSV is written to a Grace-managed local-temp://formatted_data.csv.
LOADDBPG:
The ShellHandler executes the inline script:
source .env loads your database credentials into the shell's environment.
If the entire workflow is successful, Grace will print a confirmation message and indicate the location of the log directory (e.g., .grace/logs/YYYYMMDDTHHMMSS_run_workflow-uuid/).
Navigate to the run-specific directory inside .grace/logs/.
Inspect summary.json for an overview of each job's status and return codes.
Look at individual JOBID_JOBNAME.json files for detailed run details.
Examine the stdout and stderr captured for the XFRMCSV and LOADDBPG shell jobs within their respective JSON log files.
Inspect intermediate files (optional, for debugging):
If you had set keep: true on zos-temp://vsam_extract_fixedwidth.txt or local-temp://formatted_data.csv, you could inspect these files.
The zos-temp:// dataset DSN can be found in the JCL in .grace/deck/RUNVSAMX.jcl or from .grace/logs/<workflow_run>/workflow.log.
The local-temp:// file would be in .grace/logs/<run_id>/.local-staging/formatted_data.csv.
Verify data in PostgreSQL:
Connect to your AWS RDS PostgreSQL instance using your preferred SQL client (e.g. psql, pgAdmin, DBeaver).
Query the employee_data table:
SELECT * FROM employee_data ORDER BY employee_id;
You should see the 8 sample records from the VSAM file, now populated in your PostgreSQL table:
employee_id | employee_name | job_title | location-------------+--------------------+-----------------+----------------- 00001 | John Doe | Engineer | New York 00002 | Jane Smith | Analyst | Chicago 00003 | Peter Jones | Manager | London 00004 | Alice Williams | Developer | San Francisco 00005 | Bob Brown | Architect | New York 00006 | Carol White | Tester | Chicago 00007 | David Green | Support | London 00008 | Eve Black | Admin | San Francisco(8 rows)
Congratulations! You have successfully built and executed a complete Extract, Transform, Load (ETL) pipeline using Grace, moving data from a mainframe VSAM KSDS to an AWS RDS PostgreSQL database.
In this tutorial, you learned how to:
Define a multi-step workflow in grace.yml involving both z/OS and local shell execution environments.
Orchestrate z/OS jobs for compiling a COBOL program, link-editing it, and executing it to extract VSAM data to a sequential file using zos-temp:// for intermediate mainframe storage.
Utilize Grace's shell job module to:
Execute a Python script that downloads a zos-temp:// dataset from the mainframe.
Transform the downloaded fixed-width data into CSV format, outputting to a local-temp:// file.
Execute another Python script that reads the local-temp:// CSV.
Load data into a cloud-hosted PostgreSQL database, managing credentials via environment variables (sourced from a .env file).
Leverage Grace's virtual path system (zos://, src://, zos-temp://, local-temp://) to manage data flow seamlessly between mainframe and local processes.
Use grace deck to prepare mainframe artifacts and grace run to execute and monitor the entire hybrid workflow.
This example showcases Grace's power in bridging traditional mainframe systems with modern cloud platforms and development practices. You've seen how a declarative YAML definition can automate complex, multi-platform data pipelines.
This tutorial covered a specific ETL scenario, but the principles and Grace features you've used can be applied to a wide range of automation tasks.
Explore other Grace features:
Dive deeper into the YAML Specification to learn about all job types, directives, and advanced features.
Try out different JCL sourcing options using the job.jcl field (z/OS Job Specifics).
Experiment with JCL Templates to customize JCL generation.
Look into grace submit for running workflows asynchronously.
The possibilities for modernizing and automating mainframe-related processes with Grace are extensive. We encourage you to adapt this tutorial's concepts to your own use cases and explore what you can build!
If you have questions, encounter issues, or want to share what you've built, please join our community discussions or open an issue on GitHub.