CWL CommandLineTool for stage-out

Below a stage-out.cwl CWL (Common Workflow Language) document for a command-line tool that executes a Python script.

The document is designed to use a container and execute a Python script named "stage.py":

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
cwlVersion: v1.0


class: CommandLineTool
id: stage-out

doc: "Stage-out the results to S3"
inputs:
  s3_bucket:
    type: string
  sub_path:
    type: string
  aws_access_key_id:
    type: string
  aws_secret_access_key:
    type: string
  region_name:
    type: string
  endpoint_url:
    type: string
  stac_catalog:
    type: Directory
outputs:
  s3_catalog_output:
    outputBinding:
      outputEval: ${  return "s3://" + inputs.s3_bucket + "/" + inputs.sub_path + "/catalog.json"; }
    type: string
baseCommand:
  - python
  - stage.py
arguments:
  - $( inputs.stac_catalog.path )
  - $( inputs.s3_bucket )
  - $( inputs.sub_path )
requirements:
  DockerRequirement:
    dockerPull: ghcr.io/terradue/ogc-eo-application-package-hands-on/stage:1.3.2
  InlineJavascriptRequirement: {}
  EnvVarRequirement:
    envDef:
      aws_access_key_id: $( inputs.aws_access_key_id )
      aws_secret_access_key: $( inputs.aws_secret_access_key )
      aws_region_name: $( inputs.region_name )
      aws_endpoint_url: $( inputs.endpoint_url )
  ResourceRequirement: {}
  InitialWorkDirRequirement:
    listing:
      - entryname: stage.py
        entry: |-
          import os
          import sys
          import pystac
          import botocore
          import boto3
          import shutil
          from pystac.stac_io import DefaultStacIO, StacIO
          from urllib.parse import urlparse

          cat_url = sys.argv[1]
          bucket = sys.argv[2]
          subfolder = sys.argv[3]

          aws_access_key_id = os.environ["aws_access_key_id"]
          aws_secret_access_key = os.environ["aws_secret_access_key"]
          region_name = os.environ["aws_region_name"]
          endpoint_url = os.environ["aws_endpoint_url"]

          shutil.copytree(cat_url, "/tmp/catalog")
          cat = pystac.read_file(os.path.join("/tmp/catalog", "catalog.json"))

          class CustomStacIO(DefaultStacIO):
              """Custom STAC IO class that uses boto3 to read from S3."""

              def __init__(self):
                  self.session = botocore.session.Session()
                  self.s3_client = self.session.create_client(
                      service_name="s3",
                      use_ssl=True,
                      aws_access_key_id=aws_access_key_id,
                      aws_secret_access_key=aws_secret_access_key,
                      endpoint_url=endpoint_url,
                      region_name=region_name,
                  )

              def write_text(self, dest, txt, *args, **kwargs):
                  parsed = urlparse(dest)
                  if parsed.scheme == "s3":
                      self.s3_client.put_object(
                          Body=txt.encode("UTF-8"),
                          Bucket=parsed.netloc,
                          Key=parsed.path[1:],
                          ContentType="application/geo+json",
                      )
                  else:
                      super().write_text(dest, txt, *args, **kwargs)


          client = boto3.client(
              "s3",
              aws_access_key_id=aws_access_key_id,
              aws_secret_access_key=aws_secret_access_key,
              endpoint_url=endpoint_url,
              region_name=region_name,
          )

          StacIO.set_default(CustomStacIO)

          for item in cat.get_items():
              for key, asset in item.get_assets().items():
                  s3_path = os.path.normpath(
                      os.path.join(os.path.join(subfolder, item.id, asset.href))
                  )
                  print(f"upload {asset.href} to s3://{bucket}/{s3_path}",file=sys.stderr)
                  client.upload_file(
                      asset.get_absolute_href(),
                      bucket,
                      s3_path,
                  )
                  asset.href = f"s3://{bucket}/{s3_path}"
                  item.add_asset(key, asset)

          cat.normalize_hrefs(f"s3://{bucket}/{subfolder}")

          for item in cat.get_items():
              # upload item to S3
              print(f"upload {item.id} to s3://{bucket}/{subfolder}", file=sys.stderr)
              pystac.write_file(item, item.get_self_href())

          # upload catalog to S3
          print(f"upload catalog.json to s3://{bucket}/{subfolder}", file=sys.stderr)
          pystac.write_file(cat, cat.get_self_href())

          print(f"s3://{bucket}/{subfolder}/catalog.json", file=sys.stdout)

The Python script stage.py uploads a Spatio-Temporal Asset Catalog (STAC) catalog to an Object Storage S3 bucket. This script utilizes the boto3 library for the S3 operations and customizes the STAC I/O to read from and write to an S3 bucket.

Here's a breakdown of what the script does:

  • The script imports necessary libraries and modules, including os, sys, pystac, botocore, boto3, shutil, and more.

  • It reads command-line arguments to get the STAC catalog URL, S3 bucket name, and a subfolder path to where the catalog will be uploaded.

  • It retrieves the S3 credentials and endpoint URL from environment variables.

  • The script copies the STAC catalog (from the given URL) to a temporary directory under "/tmp/catalog."

  • A custom STAC I/O class (CustomStacIO) is defined. This class extends the DefaultStacIO class and uses boto3 to interact with S3. It has methods for writing STAC text content to S3.

  • A connection to S3 is established using boto3 with the provided S3 credentials and endpoint URL.

  • The default STAC I/O class is set to the custom CustomStacIO.

  • The script iterates through items in the STAC catalog, including their assets. For each asset, it uploads the data to the specified S3 bucket under the given subfolder and updates the asset's href to the S3 location.

  • After uploading all assets, it normalizes the catalog's hrefs to point to the S3 location.

  • The script then uploads each item in the catalog to S3, updating their hrefs accordingly.

  • Finally, it uploads the catalog itself to S3.

  • The script prints the S3 URL of the uploaded catalog to the standard output.

This script uploads a STAC catalog and its associated assets to an S3 bucket, making it accessible through the S3 endpoint.

It uses a custom STAC I/O to facilitate S3 interactions.

The AWS credentials and endpoint URL need to be properly configured in the environment variables for the script to work. A

1
2
3
4
5
6
7
export WORKSPACE=/workspace/app-package-training-bids23

podman \
    build \
    --format docker \
    -t localhost/stage:latest \
    ${WORKSPACE}/water-bodies/command-line-tools/stage

To run the stage-out step, one would run:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
cwltool \
    --podman \
    ${WORKSPACE}/cwl-cli/stage-out.cwl \
    --aws_access_key_id $( cat ~/.aws/credentials | grep aws_access_key_id | cut -d "=" -f 2 ) \
    --aws_secret_access_key $( cat ~/.aws/credentials | grep aws_secret_access_key | cut -d "=" -f 2 ) \
    --endpoint_url $( cat ~/.aws/config | grep endpoint_url | head -n 1 | cut -d "=" -f 2 ) \
    --region_name $( cat ~/.aws/config | grep region | cut -d "=" -f 2 ) \
    --s3_bucket bids23 \
    --sub_path processing-results/$( cat /proc/sys/kernel/random/uuid ) \
    --stac_catalog /workspace/runs