Snowflake SNS integration for Snowflake SnowPipe and task for failure

Overview

Snowpipe can push error notifications to a cloud messaging service when it encounters errors while loading data i.e. through snowpipe or on failure of snowflake task. The notifications describe the errors encountered in each file, enabling further analysis of the data in the files for snowpipe. Task error notifications trigger a notification describing the errors encountered when a task executes SQL code. The notifications describe any errors encountered during task execution.

Currently, cross-cloud support is not available for push notifications. Configure error notification support for the messaging service provided by the cloud platform where your Snowflake account is hosted.

Since our Snowflake is hosted in AWS the setup for push notification has been done in AWS.

For both snowpipe and snowflake task the SNS integration is same. Over here enabling-error-notifications Snowflake has defined steps for the same.

Snowflake SnowPipe and Tasks Triage

Notified a repeatable situation occurred:

  • Occasionally we see a drop in data and notify it after some time - we implemented mechanism to have a push warning about missing or incomplete data. Usually, we use a reactive approach after we are informed from the business side, which is not the preferred way. Now, we are more proactive and alerted immediately when error happened.
  • The pipelines we monitor (version_db, Snowplow, PTO) are running either via Snowflake tasks or Snowpipe mechanism.
  • Used a comprehensive approach to focus and reveal shape and status of pipelines inside Snowflake

Workflow

graph LR
    PM[Pipelines monitoring]

    subgraph DE
        TM[Technical monitoring]
        PH[Pipeline health]
        DL[Data freshness check]
    end
    subgraph DE_AE[DE+AE]
        OD[Observing data]
        MT[Metrics throughput check]
    end

    PM-->TM
    TM-->PH
    TM-->DL
    PM-->OD
    OD-->MT

Technical monitoring

Have to make sure that we get notified via Slack if there are errors on SnowPipe and/or Snowflake tasks. In case of an error and if it turns out that the root cause is upstream we have to solve in collaboration with source owners. The Data Platform is responsible for health of pipelines.

graph LR
    SNS_SETUP
    Lambda-->SLACK
    SNS_SETUP-->SNS_TOPIC
    subgraph Snowflake
        TASK(Tasks)
        SNOWPIPE(Snowpipes)
        SNS_SETUP[SNS setup for Taks and Snowpipes]
        SNOWPIPE-->SNS_SETUP
        TASK-->SNS_SETUP
    end

    subgraph AWS [AWS]
        SNS_TOPIC[SNS Topic] --> Lambda[Lambda function Python]
    end

    subgraph Slack
        SLACK[#data-pipelines channel alert]
    end

Below covered details of the name and commands used for setup.

Step 1: Created SNS topic in AWS named gitlab-snowflake-notification following the instructution.

For this full access on SNS topic should be provided to the user Same SNS topic can be used for all snowflake task and snowpipe push notification. Capture the ARN value for the topic as it will be required in step 4.

Step 2: Requested IT help for Creating the IAM Policy in AWS

It Create an AWS Identity and Access Management (IAM) policy that grants permissions to publish to the SNS topic.

Step 3: Requested IT help for Creating the AWS IAM Role in AWS.

Create an AWS IAM role on which to assign privileges on the SNS topic Record the Role ARN value located on the role summary page.

Step 4: Creating the Notification Integration in snowflake

To create Notification Integration ACCOUNTADMIN privileges are required.

CREATE OR REPLACE NOTIFICATION INTEGRATION gitlab_data_notification_int
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AWS_SNS
  DIRECTION = OUTBOUND
  AWS_SNS_TOPIC_ARN = '<topic_arn>'
  AWS_SNS_ROLE_ARN = '<iam_role_arn>';

Step 5: Granting Snowflake Access to the SNS Topic

Run below query in Snowflake:

  DESC NOTIFICATION INTEGRATION gitlab_data_notification_int;

And capture SF_AWS_IAM_USER_ARN and SF_AWS_EXTERNAL_ID post this modify the Trust Relationship in the IAM Role by setting

 {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<sf_aws_iam_user_arn>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<sf_aws_external_id>"
        }
      }
    }
  ]
}

This will require the IT help to update the policy.

Step 6: Grant usage permission on integration to loader role

With ACCOUNTADMIN role execute below in Snowflake.

 GRANT USAGE on  INTEGRATION gitlab_data_notification_int to role loader;

Note: Above setup should not be modified because modifying any step will require all the step from 1 to 6 redo as the integration will break.

Step 7: Create Lambda function to send slack notification

For this the user should have Create lambda function and ListRoles permission in AWS. If any of those 2 privileges are missing, create an AR issue.

For Creating lambda function select the following :

  1. Author from scratch
  2. Give unique name to lambda function
  3. Select any of the Python version (everything >=3.10 is fine)
  4. Architecture Default
  5. it/data-team/ Change default execution role select Use an existing role and it/data-team/ this select Gitlab-lambda-snowflake
  6. Click Create function button

More details are shown in the image below.

LambdaFunctionCreate.png

Once the function is created it/data-team/ code you can set up the basic Python code to send the Slack notification. Here is an example from our gitlab_snowflake_notification lambda code:

import urllib3
import json
import os

http = urllib3.PoolManager()

def lambda_handler(event, context):
    url = os.environ["slack_webhook_url"]
    data = event["Records"][0]
    timestamp = data["Sns"]["Timestamp"]
    timestamp = timestamp.replace("T", " ").replace("Z", "")
    task_details= json.loads(data["Sns"]["Message"])
    if 'taskName' in task_details:
        failure_type_name= task_details['taskName']
        error_meesage=task_details['messages'][0]["errorMessage"]
        title="Snowflake Task Failure Alert"
    if 'pipeName' in task_details:
        failure_type_name= task_details['pipeName']
        error_meesage=task_details['messages'][0]["firstError"]
        title="Snowpipe Failure Alert"

    error_message_markdown=f"```{error_meesage}```"

    log_link='https://gitlab.com/gitlab-data/runbooks/-/blob/main/triage_issues/snowflake_pipeline_failure_triage.md'
    log_link_markdown = f"<{log_link}|Runbook>"

    message = {
            "channel": "#data-pipelines",
            "username": "SNOWFLAKE_TASK_PIPE",
            "icon_emoji": "snowflake",
            "attachments": [{
                          "fallback":"Snowflake Task Failure Alert",
                          "color":"#FF0000",
                          "fields": [
                                    {
                                        "title":title,
                                        "value":failure_type_name
                                    },
                                    {
                                        "title":"Timestamp",
                                        "value":timestamp
                                    },
                                    {
                                        "title":"Error Message",
                                        "value":error_message_markdown
                                    },
                                    {
                                        "title":"Steps to view error message in snowflake",
                                        "value":log_link_markdown
                                    }
                                    ]
                        }]
    }

    encoded_msg = json.dumps(message).encode("utf-8")
    resp = http.request("POST", url, body=encoded_msg)

    print(
        {
            #"message": event['Records'],
            "status_code": resp.status,
            "response": resp.data
        }
    )

Things to consider during the implementation:

  • Good practice is to encrypt environment variables you plan to use in AWS Lambda function, as a part of good practices. Here is the comprehensive guideline on how to secure environment variables.

Above function will send all the event notification information into Slack for snowflake tasks and snowpipe failures and it will look like:

slack_alert_example.png

Step 7: Add trigger to the Lambda function as the SNS topic which is created in step 1

This will trigger the lambda function which will send the Slack notification

Step 8: Alter snowflake Snowpipe and Snowflake task to send notification on failure

In this example I have done it for one of snowflake task

ALTER TASK RAW.PROMETHEUS.PROMETHEUS_LOAD_TASK suspend;
ALTER TASK RAW.PROMETHEUS.PROMETHEUS_LOAD_TASK SET ERROR_INTEGRATION = gitlab_data_notification_int;
ALTER TASK RAW.PROMETHEUS.PROMETHEUS_LOAD_TASK resume;

The task should be suspended in order to apply the integration.

Once above is defined on any failure the notification will be sent to Slack channel.

All the required permission in AWS account has been provided to all data platform team member. Also the Permission has been granted to loader role to use snowflake integration.

Note: Snowpipe error notifications only work when the ON_ERROR copy option is set to SKIP_FILE (the default value). Snowpipe will not send any error notifications if the ON_ERROR copy option is set to CONTINUE.

You can use the NOTIFICATION_HISTORY table function to query the history of notifications sent through Snowpipe. For more information, refer to NOTIFICATION_HISTORY documentation.

Triage errors

When we got an alert in Slack, should go to a runbooks page and analyze the issue.

Last modified October 22, 2024: Add initial doc for clustering (a32dc620)