

The last step is creating the task either way. If the result is “ TRUE” the task will run and execute the procedure. If the result is “ FALSE” then the task will not run. The function SYSTEM$STREAM_HAS_DATA will be evaluated every 1 minute. The SYSTEM$STREAM_HAS_DATA function indicates if the specified stream contains any change tracking data. This task is scheduled to be executed at a 1-minute interval. To use it, we should request from Snowflake Support and get it enabled to the relevant account.Ī task triggers on a schedule only.

The serverless tasks are still in preview mode. Snowflake tasks can create tasks to be serverless tasks or user-managed tasks.
#Snowflake tasks update#
CREATE OR REPLACE PROCEDURE "UDF_PRODUCT_UPDATE"() RETURNS VARCHAR(16777216) LANGUAGE SQL EXECUTE AS OWNER AS $$ BEGIN MERGE INTO ITEM p USING ITEM_UPLOAD_STREAM ps ON p.ID = ps.ID WHEN MATCHED AND METADATA$ACTION = 'INSERT' THEN UPDATE SET p.NAME = ps.NAME, p.CATEGORY = ps.CATEGORY, p.UPDATE_DATE_TIME = CURRENT_TIMESTAMP() WHEN NOT MATCHED AND METADATA$ACTION = 'INSERT' THEN INSERT (p.ID, p.NAME, p.CATEGORY ) VALUES (ps.ID, ps.NAME, ps.CATEGORY) END $$ Therefore by adding the METADATA$ACTION = ‘INSERT’ condition in the script, we can filter only the inserts recorded in the stream. Since the staging table is loaded as truncate-insert way, stream is capturing it as deletes and inserts. The procedure includes using a merge script to insert new records into the destination table and update the existing records. The next step is creating the procedure and the task. Snowflake task is capable calling a procedure once changed data is captured into the stream. INSERT OVERWRITE INTO ITEM_STAGE VALUES (1, 'Product1','Category1') SELECT * FROM ITEM_UPLOAD_STREAM įollowing is the output of the select query on stream. To check the stream functionality we can insert data into the staging table and query the stream to check if the data insertion is captured by the stream. CREATE OR REPLACE STREAM ITEM_UPLOAD_STREAM ON TABLE ITEM_STAGE SHOW STREAMS

After stream creation, we can check the stream details using the show streams command.

The next step is creating the stream on the staging table. Therefore, by creating a stream on the staging table, we would be able to identify once records are inserted into the table. So that action can be taken using those changed data. Snowflake stream keeps track of DML changes made to tables. staging table CREATE OR REPLACE TABLE ITEM_STAGE ( ID NUMBER(38,0), NAME VARCHAR(20), CATEGORY VARCHAR(20) ) //destination table CREATE OR REPLACE TABLE ITEM ( ID NUMBER(38,0), NAME VARCHAR(20), CATEGORY VARCHAR(20), INSERT_DATE_TIME TIMESTAMP_NTZ(9) DEFAULT CURRENT_TIMESTAMP (), UPDATE_DATE_TIME TIMESTAMP_NTZ(9) ) If a record in the staging table already exists in the destination table, that record should be updated else it should be inserted into the destination table.įirst, let’s create the staging and destination tables. The procedure script should get the data in the staging table and compare the data with another table (let’s called this table as destination table). A procedure has to be executed soon after the staging table is inserted with data. Before every data insertion, that table is truncated and loaded (let’s call this table as staging table). Use case: A table is inserted with data multiple times of the day in an ad hoc manner.
