Skip to content

Reference

asbestos.asbestos_cursor()

Stand-in for Snowflake's Python Connector.

asbestos.asbestos.AsbestosConfig

The cursor in Asbestos takes a single argument of an instance of AsbestosConfig. When using the built-in asbestos_cursor(), it automatically uses a global config built just for this purpose.

Hey!

If you're using the default asbestos_cursor(), you do not need to instantiate your own instance of this class. Just import the existing version from asbestos.config.

clear_queries()

Remove all the registered queries and responses.

register(query, response, data=None, force_pagination_size=None)

Asbestos will watch for any call that includes the query passed in and return the response you save. If data is provided with the query, the response will only be returned if the query and data match exactly. A query with no data will match any data passed in, though.

Example:

>>> config.register(query="A", response="B")
>>> cursor.execute("A")
>>> cursor.fetchone()
"B"  # This matches, so we get what we expect

Because our registered response doesn't have data attached to it, it will match all queries that are more specific.

>>> cursor.execute("A", ("AAA",))
>>> cursor.fetchone()
"B"

However, if our registered response is more specific than the query, it will not match.

>>> config.clear_queries()
>>> config.register(query="A", data="D", response="B")
>>> cursor.execute("A", ("D",))
>>> cursor.fetchone()
"B"  # this matches the saved query exactly, so we get the expected response
>>> cursor.execute("A")
>>> cursor.fetchone()
{}  # default response
>>> cursor.execute("A", ("E",))  # extra data doesn't match!
>>> cursor.fetchone()
{}  # default response

If you're dealing with a fetchmany() call and need to control how many results are returned regardless of what the code requests, pass the return count into force_pagination_size. This will override both cursor.arraysize and the number passed into fetchmany.

>>> config.register(
...     query="A",
...     response=[{'a': 1}, {'b': 2}],
...     force_pagination_size=1
... )
>>> cursor.execute("A")
>>> cursor.fetchmany(50)
{'a': 1}  # because we overrode the value when registering

Registering a query will return the sfqid of the query that was just registered. This can be used later with get_results_from_sfqid() and remove_query_by_sfqid() to manipulate the query after it's been registered.

register_ephemeral(query, response, data=None, force_pagination_size=None)

Works the same way as AsbestosConfig.register() with the only difference being that after this query is called, it is removed from the list. This can be used for tests or other situations where you need a call to succeed only once.

remove_query_by_sfqid(sfqid)

New in 1.6.0!

If you need to remove a specific registered query (ephemeral or not), pass its sfqid in here. Returns True if the query was found and removed.

>>> query_id = config.register(query="A", response="B")
>>> len(config.query_map)
1  # one query registered
>>> config.remove_query_by_sfqid(query_id)
True  # the query was found and removed
>>> len(config.query_map)
0  # no queries left

asbestos.asbestos.AsbestosCursor

Analogue for Snowflake's cursor object.

If you're wanting to use this as a manually-constructed object, you will need to create an instance of AsbestosConfig first, then pass it into your cursor. The easiest way to handle this is to treat it like the instantiation that is used in Asbestos:

import contextlib

from asbestos import AsbestosCursor, AsbestosConfig


myconfig = AsbestosConfig()

# set up myconfig here
...

@contextlib.contextmanager
def my_custom_cursor() -> AsbestosCursor:
    yield AsbestosCursor(config=myconfig)

sfqid property

Retrieve the ID of the last-run query or None.

In Snowflake, the sfqid is an ID that's attached to every query that's run. In asbestos, it's a random number attached to a query. If you pass this value to get_results_from_sfqid(), it will reset the cursor to the values from that query to effectively run it again.

close()

Reset the queries and "close" the connection.

execute(query, inserted_data=None)

Pass SQL to asbestos for processing.

Saves the SQL and any passed-in data to the cursor and starts the process of finding your pre-saved response. To get the data, you will need to call one of the fetch* methods listed below.

execute_async(*args, **kwargs)

Functions the same as .execute().

fetchall()

Return the entire saved response.

Whereas fetchone will only return the first entry from a saved response, fetchall does what it sounds like it does. Example:

config.register(query="A", response=[{'a':1}, {'b': 2}])
with asbestos_cursor() as cursor:
    cursor.execute("A")
    resp = cursor.fetchall()

# resp = [{'a':1}, {'b': 2}]

fetchmany(size=None)

Return a paged subset of the saved response.

Uses cursor.arraysize to control the page size or just pass your requested page size into the function: .fetchmany(200). Automatically paginates the response so that you can just keep calling it until you run out of response for it to return. Example:

config.register(query="A", response=[{'a':1}, {'b': 2}])

cursor.arraysize = 1  # return one response at a time
with asbestos_cursor() as cursor:
    cursor.execute("A")
    cursor.fetchmany()  # [{'a':1}]
    cursor.fetchmany()  # [{'b': 2}]
    cursor.fetchmany()  # []

Note: a value passed directly into `fetchmany` overrides the
`cursor.arraysize`, and both of them are overridden by registering
the query with `force_pagination_size`.

fetchone()

Return the first result from the saved response for a query.

fetchone takes a pre-saved query and only gives the first piece of data from the response. Example:

config.register(query="A", response=[{'a':1}, {'b': 2}])
with asbestos_cursor() as cursor:
    cursor.execute("A")
    resp = cursor.fetchone()

# resp = {'a':1}

get_results_from_sfqid(query_id)

Resets the last-run information to the given query ID.

If you pass a sfqid from a previously-run query, it will rebuild the cursor using the data from that query, effectively allowing you to run the query again without calling .execute().

Watch out!

Ephemeral queries are only recoverable if they are the last query you ran. After you run another query, it will be fully replaced and cannot be recovered via its ID.

asbestos.asbestos.EphemeralContext

New in 1.6.0!

Ephemeral queries are, by definition, single-use. However, ephemeral queries that are paginated are not removed from the list until they are fully exhausted.

There are also situations where you may want to have regular queries behave like ephemeral queries, only slightly longer-lasting. The EphemeralContext context manager allows you to wrap a block of code and have all queries run within it removed after the block is done.

>>> config.register(query="A", response="B")
>>> config.register(query="C", response="D")
>>> len(config.query_map)
2  # two queries registered
>>> with EphemeralContext(config):
>>>     with asbestos_cursor() as cursor:
>>>         cursor.execute("A")
>>>         cursor.fetchone()
"B"  # this is the response from the first query
>>> len(config.query_map)
1  # the query that was run has been removed, but the other remains

This works for unlimited numbers of queries of any type.

asbestos.asbestos.AsbestosConn

Connector to house the Asbestos Cursor object.

The real Snowflake connector has two methods for interacting with it: either a cursor object directly (cursor.execute()) or a connection object that is referenced directly (conn.cursor.execute()). Since the goal is that this can be a more-or-less drop-in replacement, we provide an optional connector that can be used to spawn the default cursor.

The AsbestosConn doesn't use the default config!

If you want to use this, you will need to configure it through the config object that is located on this instance. Example:

from asbestos import AsbestosConn

myconn = AsbestosConn()
myconn.config.register("select * from...", {"MY_EXPECTED": "DATA"})
with myconn.cursor() as cursor:
    ...

close()

Reset the queries and "close" the connection.

get_query_status(*args)

Check on a currently-running async query.

Returns 2, the Snowflake QueryStatus Success value.

is_still_running(*args)

Check long-running async queries. Will always return False.

asbestos.exceptions

AsbestosDuplicateQuery

Bases: Exception

This exception is raised if .register() or .register_ephemeral() are used to create duplicate queries.

AsbestosMissingConfig

Bases: Exception

This exception is raised if a new cursor is created without passing in an instance of AsbestosConfig.