flowthings¶
-
class
flowthings.
Token
(account, token)¶ Token objects may be passed to an
API
. This will be used to sign requests to the platform.>>> creds = Token('<account>', '<token>')
-
static
from_bluemix
(default=None, env_var='VCAP_SERVICES')¶ Loads a
Token
object from an IBM Bluemix environment. If adefault
Token
is provided, it will be returned in case of a failure, otherwise aFlowThingsError
will be raised.
-
static
-
class
flowthings.
API
(creds, async_lib=DEFAULT, secure=DEFAULT, params=DEFAULT)¶ Creates a new API instance for interacting with the platform.
>>> api = API(creds)
Options labelled as
DEFAULT
will use the options set in thedefaults
configuration object.An
API
is comprised of services for querying the different domains on the platform:flow
drop
track
group
identity
api_task
mqtt_task
rss_task
token
share
device
statistics
websocket
For documentation on these services, read Service Methods, Authentication, Statistics, Aggregation, and WebSockets.
-
async
([pool])¶ Returns an API wrapper for making asynchronous requests using either
eventlet
orgevent
. Requests made using anasync()
API will return green threads.For more documentation, read Asynchronous and Parallel Requests.
-
lazy
([pool])¶ Returns an API wrapper for making implicitly parallel requests using either
eventlet
orgevent
. Requests made using alazy()
API will return thunks that wait on their respective green thread when accessed.For more documentation, read Asynchronous and Parallel Requests.
-
request
(method, path, data=None, params=None)¶ Parameters: - method (str) – HTTP method
- path (str) – Request path
- data (dict) – Request data
- params (dict) – Request query parameters
Makes an arbitrary platform request.
-
flowthings.
defaults
¶ Configuration object for globally setting default options for
API
instances.-
defaults.
async_lib
¶ Defaults to
None
. Supportseventlet
andgevent
.import eventlet flowthings.defaults.async_lib = eventlet
-
defaults.
secure
¶ Defaults to
True
. When set toFalse
, requests will be made overhttp://
rather thanhttps://
.
-
defaults.
params
¶ The default set of query string parameters sent with all requests. Defaults to
{}
.
-
Service Methods¶
All API
service requests return plain dictionaries of the request
body. They may throw exceptions in case of an error.
-
service.
read
(id, **params)¶ Parameters: id (str) – The resource id >>> api.flow.read('<flow_id>')
-
service.
read_or_else
(id, default=None, **params)¶ Parameters: - id (str) – The resource id
- default (any) – Default value when the resource is not found
>>> api.flow.read_or_else('<flow_id>', None)
-
service.
read_many
(ids, **params)¶ Parameters: ids (list) – List of resource ids >>> api.flow.read_many(['<flow_id_1>', '<flow_id_2'])
-
service.
find_many
(*filters, **params)¶ Parameters: filters (Filter) – Request filters >>> api.flow.find_many(mem.displayName == 'Foo')
-
service.
find
(..., **params)¶ An overloaded method which may call one of
read()
,read_many()
, orfind_many()
depending upon the type of the first argument.>>> api.flow.find('<flow_id>') >>> api.flow.find(['<flow_id_1>', '<flow_id_2']) >>> api.flow.find(mem.displayName == 'Foo')
-
service.
create
(model, **params)¶ Parameters: model (dict) – Initial data for a new resource >>> api.flow.create({'path': '/path/to/flow'})
-
service.
update
(model, **params)¶ Parameters: model (dict or M
) – Updated modelRequests are made based on the model’s
'id'
key.>>> api.flow.update({'id': '<flow_id>', 'displayName': 'Foo'}) >>> api.flow.update(M(model, displayName='Foo'))
-
service.
update_many
(models, **params)¶ Parameters: models (list) – List of updated models
-
service.
save
(..., **params)¶ An overloaded method which may call one of
create()
,update()
, orupdate_many()
depending upon the type of the first argument.create()
orupdate()
are called based on the presence of an'id'
key.
-
service.
delete
(id, data=None, **params)¶ Parameters: - id (str) – The resource to delete
- data (any) – Request data
>>> api.flow.delete('<flow_id>')
Note
The drop
service is slightly different in that it must first be
parameterized by the Flow id.
>>> api.drop('<flow_id>').find(limit=10)
Request Parameters¶
Service methods take additional keyword arguments that act as query parameters on the requests. These are not fixed in any way, so please refer to the platform documentation for the options.
Note
When a request is made with the refs
parameter set to True
, the return
type becomes a tuple rather than a single dictionary:
>>> resp, refs = api.flow.find('<flow_id>', refs=True)
Request Filters¶
Service find methods
understand a query DSL that
lets you express filters using Python operations instead of manually splicing
strings together.
>>> api.flow.find(mem.displayName == 'foo', mem.path.re('^/foo', 'i'))
-
class
flowthings.
mem
¶ mem represents members of the objects you are querying. You can use use properties or key indexing to represent a member.:
>>> api.drop(<flow_id>).find(mem.elems.foo > 12)
The supported operators are
==
,<
,<=
,>
, and>=
along with the following methods, mirroring the platform:-
re
(pattern[, flags])¶
-
IN
(*items)¶
-
CONTAINS
(*items)¶
-
WITHIN
(distance, unit[, coords=(lat, lon)[, zip=zipcode]])¶
-
Additional platform filter operations are supported:
-
flowthings.
EXISTS
(member)¶
-
flowthings.
HAS
(elem_type)¶
-
flowthings.
MATCHES
(pattern[, flags])¶
-
flowthings.
NOT
(filter)¶
-
flowthings.
AGE
¶ Age comparisons can be made using normal python operators with
AGE
.:>>> api.flow.find(AGE > time_millis)
Boolean operations are supported on filters using AND
and OR
.:
>>> api.flow.find((mem.displayName == 'foo').OR(mem.displayName == 'bar'))
Authentication¶
If you create your API
using a master token, you can create and
manage tokens and shares.
-
api.token.
create
(model, **params)¶
Both tokens and shares support find
and delete
methods like other
services. They are, however, immutable and do not support updates.
Statistics¶
-
api.statistics.
flow_drop_added
(flow_id, year=None, month=None, day=None, level=None)¶
-
api.statistics.
flow_tracked
(flow_id, year=None, month=None, day=None, level=None)¶
-
api.statistics.
track_hit
(track_id, year=None, month=None, day=None, level=None)¶
-
api.statistics.
track_pass
(track_id, year=None, month=None, day=None, level=None)¶
-
api.statistics.
api_call_by_identity
(identity_id, year=None, month=None, day=None, level=None)¶
-
api.statistics.
drop_created_by
(identity_id, year=None, month=None, day=None, level=None)¶
Aggregation¶
-
api.
drop
(flow_id).aggregate(outputs, group_by=None, filter=None, rules=None, sorts=None)¶
Both filter
and rules
support Request Filters.:
>>> api.drop(flow_id).aggregate(['$avg:test'], rules={'test': mem.foo > 42})
Exceptions¶
-
class
flowthings.
FlowThingsError
¶
-
class
flowthings.
FlowThingsException
¶ -
errors
¶ List of errors returned from the platform
-
creds
¶ Request credentials
-
method
¶ Request HTTP method
-
path
¶ Request path
-
-
class
flowthings.
FlowThingsBadRequest
¶
-
class
flowthings.
FlowThingsForbidden
¶
-
class
flowthings.
FlowThingsNotFound
¶
-
class
flowthings.
FlowThingsServerError
¶
Modifications¶
Service update methods
can also take an instance
of a modification helper called M
. It lets you gradually make
updates to a model and then extract the diff and model with the changes
applied.
When passed directly to an update method, only the changes will be sent to the server instead of the entire model.
Asynchronous and Parallel Requests¶
Two workflows are supported for making asynchronous and parallel requests.
The API.async()
workflow is an imperative API where requests are
queued internally. Once you’ve made all the requests you need, you can invoke
the results()
method to wait. This can be useful when making large batches
of similar requests:
paths = [...]
async_api = api.async()
for path in paths
async_api.flow.find(mem.path == path)
for flows in async_api.results():
# Do something with the flows
pass
If some of your requests might fail, and you want to know which ones, you may
set the with_exceptions
keyword argument:
flows = [...]
async_api = api.async()
for flow in flows:
async_api.drop(flow['id']).find(limit=10)
for e, drops in async_api.results(with_exceptions=True):
if e:
# Do something if there was an error
pass
else:
# Do something with the drops
pass
The API.lazy()
worklow is useful when building complex compositions of
dependent requests which can benefit from implicit parallelization. All
requests are executed in parallel, but wait when you try to read the data. This
works by requests returning a GreenThunk
, which is a MutableMapping
around a green thread. This object acts just like a regular dictionary or list,
but waits on the green thread before performing any look-ups or mutations.
lazy_api = api.lazy()
flow_a = lazy_api.flow.find(mem.path == '/path/to/flow_a')
flow_b = lazy_api.flow.find(mem.path == '/path/to/flow_b')
drops = lazy_api.drop(flow_a[0]['id']).find(limit=10)
In this example, the two requests for Flows are performed in parallel, while
the requests for drops waits for the flow_a
request to complete first.
You can retrieve the pure data of a GreenThunk
by invoking its unwrap()
method.
Note
It is assumed the user has done the necessary green thread monkey-patching
for their chosen library before importing the flowthings
package.
WebSockets¶
WebSockets are supported using the websocket-client
package. Here is a
short example:
def on_open(ws):
ws.subscribe('<flow_id>')
def on_message(ws, resource, data):
print 'Got message:', resource, data
def on_close(ws):
print 'Closed'
def on_error(ws, e):
print 'Error:', e
ws = api.websocket.connect(on_open=on_open,
on_message=on_message,
on_close=on_close,
on_error=on_error)
ws.run()
Examples¶
from flowthings import API, Token, mem
creds = Token('<account_name>', '<token_string>')
api = API(creds)
# Get a Flow by id
api.flow.find('<flow_id>')
# Get a Flow by path
api.flow.find(mem.path == '<flow_path>')
# Get 10 recent Flows, with references
flows, refs = api.flow.find(limit=10, refs=True)
# Create a flow
api.flow.create({ 'path': '<flow_path' })
# Delete a flow
api.flow.delete('<flow_id>')
# Get drops in a flow
api.drop('<flow_id>').find()
# Filter drops in a flow
api.drop('<flow_id>').find(mem.elems.foo == 'value')