Category Archives: Uncategorized

Read and write to Java data stream using Python

Recently at work I came across a file created in Java that consisted of primitive Java values that had been saved using Javas DataOutputStream. I wanted to read the file using Python but couldn’t find any existing library for this, so I wrote a simple library for reading and writing to a binary format that is compatible with Javas DataOutputStream and DataInputStream.

It is very simple but supports most of the operations for reading and writing to a Java stream, hopefully it can be of some use if someone should need to read or write Java compatible streams.

You use it in almost the identical way as the Java library, but the methods have more “Pythonic” names.

with open('/tmp/stream', 'wb') as f:
    dos = DataOutputStream(f)
    dos.write_int(12345)
    dos.write_utf('hello world')

with open('/tmp/stream', 'rb') as f:
    dis = DataInputStream(f)
    val = dis.read_int()
    string = dis.read_utf()

If you are planning on using it for something important I would recommend that you do some more testing and verification that it works, unittest exists but it does not cover all situations and there is no exception handling..

The complete code can be found here.

LockerRoom – Distributed lock manager in Python

A problem when running systems in a network environment is that you might need to control access to resources so that only one of your programs updates a resource at a time.

For this you could use a Distributed Lock Manager (DLM) which provides synchronised access to resources over a network. If you are running Memcache or Redis you can find implementations for DLM:s here and here. But what if you want to use MongoDB for the task? Then LockerRoom might be for you!

LookerRoom is a simple implementation of a DLM that saves the locks in MongoDB, in that way making them easily accessible on a network.

You initialise the lock manager like this:

import locker_room
locker = locker_room.LockerRoom(host='server1')

Where host is hostname of the server running MongoDB. You can also specify which database and which collection to store the locks in if you don’t want the default values.

To get a lock you are recommended to either use locker as a context or a function decorator:

with locker.lock_and_release('my_lock', owner='gustav', timeout=2):
    # do important stuff

or

@locker.lock_and_release('my_lock'):
def important_function():
    # do important stuff

But it is also possible to just call lock and release separately:

locker.lock('my_lock', timeout=2)
# do stuff
locker.release('my_lock')

The lock methods take optional values for timeout (how long to wait for accessing the lock before raising exception) and for owner (who owns the lock).

You can also see the status of a lock, i.e if it is locked, when the lock was set and who owns it, by calling the status method:

locker.status('my_lock')
>> {u'owner': u'gustav', u'timestamp': datetime.datetime(2014, 4, 17, 14, 6, 8, 291000), 
    u'_id': u'my_lock',  u'locked': True}

The complete source code can be found here.

Neural Networks using Pylearn2 – termination criteria, momentum and learning rate adjustment

A while ago I wrote a post describing how to use Pylearn2 for training neural networks. By my very modest standards it became quite popular so I thought I should follow it with a more advanced example that introduces more advanced termination criteria, momentum and learning rate adjustment.

It should be noted that this post is mostly aimed at people that do not want to use Pylearn2 in the recommended way, which is to use YAML files for setting up the training configuration. If you have the choice, using YAML is much easier and documentation for that is available on Pylearn2 website.

This tutorial will show how to use some more advanced techniques available in Pylearn2 for solving the Pima Indians Diabetes problem. This problem is a binary classification problem and to run the code you will need to download the dataset which can be found here.

We create the dataset by reading it from file. Here we also have defined a split-method for splitting the dataset into two parts:

class Pima(DenseDesignMatrix):
    def __init__(self, X=None, y=None):
        X = X
        y = y
        if X is None:
            X = []
            y = []
            with open(PIMA_DATASET) as f:
                for line in f:
                    features, label = line.rsplit(',', 1)
                    X.append(map(float, features.split(',')))
                    if int(label) == 0:
                        y.append([1, 0])
                    else:
                        y.append([0, 1])
            X = np.asarray(X)
            X = scaler.fit_transform(X)
            y = np.asarray(y)
        super(Pima, self).__init__(X=X, y=y)

    @property
    def nr_inputs(self):
        return len(self.X[0])

    def split(self, prop=.8):
        cutoff = int(len(self.y) * prop)
        X1, X2 = self.X[:cutoff], self.X[cutoff:]
        y1, y2 = self.y[:cutoff], self.y[cutoff:]
        return Pima(X1, y1), Pima(X2, y2)

    def __len__(self):
        return self.X.shape[0]

    def __iter__(self):
        return itertools.izip_longest(self.X, self.y)

We create three datasets for training, validation and test, and initialise a hidden sigmoid layer with 20 neurons and a 2-neuron softmax output layer with the name “output”.

ds_train = Pima()
ds_train, ds_valid = ds_train.split(0.7)
ds_valid, ds_test = ds_valid.split(0.7)
hidden_layer = mlp.Sigmoid(layer_name='hidden', dim=20, irange=.05, init_bias=1.)
output_layer = mlp.Softmax(2, 'output', irange=.05)

Termination criteria

In my previous post I described the simplest possible termination criteria that just stops after a given number of epochs. A more interesting criteria is to halt training after it has stopped improving. This can be done by using a monitor-based criteria that listens on a certain channel and measures how some value on that channel is changing. In this example our monitor-based criteria measures the classification error on the output layer, (the channel name for this is “<layer name>_misclass” so in our case the name is “output_misclass”), and stops after 50 epochs without any improvement.

termination_criterion = MonitorBased(channel_name='output_misclass',
                                     N=50, prop_decrease=0.0)

(It should be noted at this point that I have not put any work on trying to find optimal values for any of the hyper parameters in this tutorial so they might be quite far from optimal.)

Momentum

Momentum is used to preserve some of the networks weight values from one epoch to the following and can be useful to help avoiding getting stuck in a local minima. A momentum of m means that a fraction m of the previous weight state is added to the current. Is is common to adjust the momentum during training by starting with a lower momentum and increase it as the training leads to a (hopefully) more stable global minima.

In Pylearn2 this is easily done by defining a momentum learning rule and a momentum adjustor:

initial_momentum = .5
final_momentum = .99
start = 1
saturate = 50
momentum_adjustor = learning_rule.MomentumAdjustor(final_momentum, start, 
                                                   saturate)
momentum_rule = learning_rule.Momentum(initial_momentum)

We start with a momentum of 0.5 and gradually adjust it between epochs 1 and 50 so that it reaches 0.99 at epoch 50.

Learning rate adjustment

As in the previous post, we will use the Stochastic Gradient Descent algorithm for training the network. This algorithm has a learning rate-parameter that determines the size of the weight changes from one iteration to another. A smaller learning rate makes the learning slower but more precise since it takes smaller steps on each iteration. A larger value gives faster learning but can cause it to overshoot and miss the optima.

A good strategy can therefore be to start with a larger learning rate that decreases in value as the learning gets closer to the optima.

In Pylearn2 this is done by using a learning rate adjustor:

start = 1
saturate = 50
decay_factor = .1
learning_rate_adjustor = sgd.LinearDecayOverEpoch(start, saturate, decay_factor)

This adjustor starts at epoch 1 and decreases the learning rate by 10% each epoch until epoch 50 is reached.

Training algorithm

We create the trainer like this:

trainer = sgd.SGD(learning_rate=.05, batch_size=10, monitoring_dataset=ds_valid,
                  termination_criterion=termination_criterion, 
                  learning_rule=momentum_rule)
trainer.setup(ann, ds_train)

Note that we have added the validation dataset as monitoring_dataset, so the monitor-based termination criteria is measured against it. If we had used the training set as monitoring_dataset it could easily lead to overfitting since we then would both train and measure performance against the same data.

We need a way to keep track of the best found model during training, (measured against the validation set). This is done by having a monitor that saves the best model in a file that we later can read to get the globally best model:

monitor_save_best = best_params.MonitorBasedSaveBest('output_misclass',
                                                     '/tmp/best.pkl')

We are now ready to start training:

while True:
    trainer.train(dataset=ds_train)
    ann.monitor.report_epoch()
    ann.monitor()
    monitor_save_best.on_monitor(ann, ds_valid, trainer)
    if not trainer.continue_learning(ann):
        break
    momentum_adjustor.on_monitor(ann, ds_valid, trainer)
    learning_rate_adjustor.on_monitor(ann, ds_valid, trainer)

After each epoch we:

  1. Check if the current found model is the globally best and then saves it.
  2. Check if the termination criteria has been met, then we are finished.
  3. Adjust momentum and learning rate for next epoch.

When the training is done we need to load the globally best model:

ann = serial.load('/tmp/best.pkl')

And then we can finally evaluate this model on the datasets. For this we create a helper function that classifies an input vector and returns the prediction, and a function for testing a models accuracy on a given dataset:

def classify(inp):
    inp = np.asarray(inp)
    inp.shape = (1, ds_train.nr_inputs)
    return np.argmax(ann.fprop(theano.shared(inp, name='inputs')).eval())

def score(dataset):
    nr_correct = 0
    for features, label in dataset:
        if classify(features) == np.argmax(label):
            nr_correct += 1
    print '%s/%s correct' % (nr_correct, len(dataset))

Running the complete code, (which can be found here), yields the following result (at least for me):

Accuracy of train set:
398/537 correct
Accuracy of validation set:
136/161 correct
Accuracy of test set:
52/70 correct

 

Neural network example using Pylearn2

I was recently looking into using a neural network for a project so I started looking into some of the available Python libraries. The one I ended up using was Pylearn2 which is a fast and powerful library for machine learning that is mainly built upon Theano.

Pylearn2 is under development and is still a bit rough around the edges and the documentation is limited and in some instances not correct. The recommended way of using it is by writing YAML scripts and if you are ok with that you can probably manage with the existing documentation. But if you, like me, want to use it as a standard Python library you have better be prepared to read the code. One thing that would have saved me some time was a complete example of how to use Pylearn2 as a standalone library, so what follows is a simple example of creating a neural network for solving the XOR problem.

The XOR problem is stated as follows, create a neural network that given two binary inputs, 0 or 1, the output should be a 1 if exactly one of the inputs are 1 and 0 otherwise.

Pylearn2 has a dataset implementation that in its simplest form needs a collection of datapoints in a 2D Numpy array named X and a 2D array named y containing the answers. We can create a dataset by creating a new class that inherits from DenseDesignMatrix:

class XOR(DenseDesignMatrix):
    def __init__(self):
        self.class_names = ['0', '1']
        X = [[randint(0, 1), randint(0, 1)] for _ in range(1000)]
        y = []
        for a, b in X:
            if a + b == 1:
                y.append([0, 1])
            else:
                y.append([1, 0])
        X = np.array(X)
        y = np.array(y)
        super(XOR, self).__init__(X=X, y=y)

ds = XOR()

Note that we are using two columns in the target variable y, a 1 in the first column signifies a output of 0 and a 1 in the second columns signifies a output of 1.

Next we need to create the layers in the neural net. To be able to solve the XOR problem we need a hidden layer with at least two neurons:

hidden_layer = mlp.Sigmoid(layer_name='hidden', dim=2, irange=.1, init_bias=1.)

The hidden layer uses a standard sigmoid activation function and the weights are initialized in the range -0.1 to 0.1 (using the irange argument). We also add a bias to the two neurons with value 1.0.

We use a softmax layer with two nodes as output layer. The output from the two nodes is between 0 and 1 and the sum of the output from all nodes in the layer is 1.

output_layer = mlp.Softmax(2, 'output', irange=.1)

To train the network we use a Stochastic Gradient Descent (SGD) method which we initialize like this:

trainer = sgd.SGD(learning_rate=.05, batch_size=10, termination_criterion=EpochCounter(400))

We use a simple termination criterion that runs for 400 epochs, more advanced termination criteria are of course available.

To initialize the neural network and setup the training we do like this:

layers = [hidden_layer, output_layer]
ann = mlp.MLP(layers, nvis=2)
trainer.setup(ann, ds)

We put the layers in the Multi-Layer Perceptron class with two inputs and then setup the trainer with the class and the dataset.

We then train the neural network until the termination criteria is reached:

while True:
    trainer.train(dataset=ds)
    ann.monitor.report_epoch()
    ann.monitor()
    if not trainer.continue_learning(ann):
        break

After the training is complete we of course wants to test that it works. We do this by using the fprop-method that takes the inputs as Theano variables:

inputs = np.array([[0, 1]])
print ann.fprop(theano.shared(inputs, name='inputs')).eval()

This should yield a answer like this:

[[ 0.00526688  0.99473312]]

Meaning that the network correctly predicts that the output should be a 1.

See here for the complete source code of the example.

Calculating volatility of multi-asset portfolio, example using Python

A standard way of measuring the risk you are taking when investing in an asset, say for instance a stock, is to look at the assets volatility. This can easily be calculated as the standard deviation of the daily returns of the asset. If we for instance have invested all of our money in Apple and we have downloaded the historical price of the stock we could do like this (example needs Numpy to run):

import numpy as np
stock_prices = <Apple's historical stock price>
normalized_prices = np.asarray(stock_prices) / stock_prices[0]
daily_ret = [0.0]
for i in xrange(1, len(normalized_prices)):
  daily_ret.append(normalized_prices[i] / normalized_prices[i-1] - 1)
volatility = np.std(daily_ret)

This is all nice and easy when we are only looking at a single asset, in this case Apple. But if you are a bit more serious about your investments you probably understand the importance of diversifying your investments and hold a portfolio containing several stocks and/or other assets.

By diversifying your portfolio you can lower the volatility of the portfolio and, at least in theory, create a portfolio with lower volatility then any of the individual assets in the portfolio.

So assume for instance that our portfolio consists of three stocks; Microsoft, Apple and Kraft. Assue further that the the weight of the three stocks in our portfolio is 0.3, 0.5 and 0.2. Meaning that 30% of our money is invested in Microsoft, 50% in Apple and 20% in Kraft.

What is now the volatility of the whole portfolio? The naive way would be to take the weighted average of the volatility of the individual stocks. So the volatility of our portfolio, Vol(p), would then be calculated as:

Vol(p) = (0.3 * Vol(Microsoft) + 0.5 * Vol(Apple) + 0.2 * Vol(Kraft)) / 3

But this is wrong, dangerously wrong. What this method misses to take into account is the correlation between the stocks. Correlation tells us how the stocks move in relation to one another, both in terms of direction and of intensity. Correlation between two assets is given as a number between -1 and 1. If the correlation is 1, the two stocks move in perfect sync, if one of them gains 2% the other one will also gain 2%. If one of them falls 5%, the other will also fall 5%.

If the correlation is -1 they move in perfect sync but opposite each other. So when one of the stocks gains 3% the other falls 3%.

A correlation of zero means that there is no relation between how the two stocks move.

So a diversified portfolio should consists of assets that do not correlate “too much”. In our three-asset example we can assume that Microsoft and Apple have a strong positive correlation since they are in the same area of business. So adding one of them do not help much with diversification of the portfolio.

Our measurement of volatility should therefore take into account the correlation between each of the assets. The equation for this volatility gets quite hairy for portfolios larger then two or three assets, but fortunately for us we can use a matrix operation for the calculation. If we put the weights of the assets in the portfolio in an array w, and calculate the correlation between each asset in a matrix corr_matrix, the variance of the portfolios daily returns can be expressed as:

Var(p) = w.T * corr_matrix * w

From this we calculate the volatility, i.e standard deviation as

Vol(p) = Sqrt(Var(p))

In Python, we could do this calculation as follows, assuming we have calculated the daily return arrays for each asset as before and put them in the variable daily_returns.

daily_returns = [daily_returns_Microsoft, daily_returns_Apple, daily_returns_Kraft]

# create the correlation matrix
corr_matrix = np.corrcoef(daily_returns)

# portfolio weights
w = np.array([0.3, 0.5, 0.2])

# portfolio volatility
portfolio_volatility = np.sqrt(w.T.dot(corr_matrix).dot(w))

Handling MongoDB AutoReconnect-exceptions in Python using a proxy

When using MongoDB in a production environment you will almost always want to set up a replica set to get better persistance and read-scaling. In a replica set you have one primary and one or more secondaries. Writes are always routed to the primary so if something should happen to the primary it becomes impossible to write to the database. When that happens a new primary is elected automatically, (if possible).

During failover and election of a new primary MongoDB raises a AutoReconnect-exception in response to any operations on the primary to signal that the operation failed. Your code therefore needs to be prepared to handle this exception. Often the correct thing to do is to wait for a little while and try the operation again, for example:

import time
import pymongo
db = pymongo.MongoReplicaSetClient(replicaSet='blog_rs').blogs

for i in range(5):
  try:
    db.posts.insert(post)
    break
  except pymongo.errors.AutoReconnect:
    time.sleep(pow(2, i))

This gets a bit annoying if you need to repeat the try-except for every line of code that calls MongoDB so a standard way is to put it in a decorator:

def safe_mongocall(call):
  def _safe_mongocall(*args, **kwargs):
    for i in xrange(5):
      try:
        return call(*args, **kwargs)
      except pymongo.AutoReconnect:
        time.sleep(pow(2, i))
    print 'Error: Failed operation!'
  return _safe_mongocall

You would then need to decorate all functions that calls MongoDB:

@safe_mongocall
def insert_blog_post(post):
  db.posts.insert(post)

But another way to do it that might be viewed as cleaner is to create a proxy around the connection to MongoDB. In that way, you could move all handling of AutoReconnects to this proxy and not have to care about catching the exception in the code.

Lets start with creating a class that can encapsulate any MongoDB-method and handle AutoReconnect-exceptions transparently using the decorator:

class Executable:
  def __init__(self, method):
    self.method = method

  @safe_mongocall
  def __call__(self, *args, **kwargs):
    return self.method(*args, **kwargs)

The Executable-class overrides the magic method __call__ that is called whenever an instance of the class is called, for example like this:

safe_post_insert = Executable(db.posts.insert)
safe_post_insert(post)

This will by itself not help us much since we would need to create safe inserts, updates, etc for every collection we want to use. The next step is therefore to create a proxy class that contains a MongoDB-connection and encapsulates all executable methods automatically.

We start by defining which of MongoDBs methods that should be wrapped in by the proxy class. We want to wrap all methods in pymongo, pymongo.Connection and pymongo.collection.Collection that do not start with “_”.

EXECUTABLE_MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection) if not typ.startswith('_')])
EXECUTABLE_MONGO_METHODS.update(set([typ for typ in dir(pymongo.Connection) if not typ.startswith('_')]))
EXECUTABLE_MONGO_METHODS.update(set([typ for typ in dir(pymongo) if not typ.startswith('_')]))

And now for the MongoProxy-class:

class MongoProxy:
    """ Proxy for MongoDB connection.
    Methods that are executable, i.e find, insert etc, get wrapped in an
    Executable-instance that handles AutoReconnect-exceptions transparently.

    """
    def __init__(self, conn):
        """ conn is an ordinary MongoDB-connection.

        """
        self.conn = conn

    def __getitem__(self, key):
        """ Create and return proxy around the method in the connection
        named "key".

        """
        return MongoProxy(getattr(self.conn, key))

    def __getattr__(self, key):
        """ If key is the name of an executable method in the    MongoDB connection, for instance find or insert, wrap this method in the Executable-class. 
        Else call __getitem__(key).

        """
        if key in EXECUTABLE_MONGO_METHODS:
            return Executable(getattr(self.conn, key))
        return self[key]

    def __call__(self, *args, **kwargs):
        return self.conn(*args, **kwargs)

The MongoProxy-class is instantiated with a MongoDB-connection object that is saved in self.conn. So to create a safe connection to MongoDB we would do like this:

safe_conn = MongoProxy(pymongo.ReplicaSetConnection(replicaSet='blogs')

This safe_conn can then be used in the exact same way that you use the ordinary MongoDB-connection with the added benefit of not having to deal with AutoReconnects.

Lets take a closer look at what happens when we do an insert using our new safe connection:

safe_conn.blogs.posts.insert(post)

First the attribute blogs is accessed which causes a call to __getattr__. Since blogs is not found in the set EXECUTABLE_MONGO_METHODS, the call is sent to __getitem__ which returns a new proxy around the internal MongoDB-connections blogs-attribute. This is then repeated also for posts. We then get to the call to insert, this attribute is found in EXECUTABLE_MONGO_METHODS so instead of returning another proxy we finally wrap the call to insert in the Executable-class which performs the actual insert.

You should also override the methods __dir__ and __repr__ to make the proxy more transparent:

def __dir__(self):
    return dir(self.conn)

def __repr__(self):
    return self.conn.__repr__()

The complete source code can be found here.

MongoDB finally changes default behavior of writes

MongoDB has finally relented and changed the default behavior for handling errors when writing in the new version of their client. Previously the default behavior was not to wait and see if the write really worked. Unless you explicitly asked for it, a failed write operation might go unnoticed.

MongoDB has gotten a lot of flak for the previous behavior and it has been a big reason for some people to abandon MongoDB in favor of other database solutions. I have never been able to work up much sympathy for users that has been complaining about having lost important data because of not setting the write behavior correctly for their application. Using a database in production without having at least a basic understanding about how it works is a sure way of inviting trouble regardless of which database you use.

But on the other hand I think it is a very good move of MongoDB that they should have done long ago. Having writes fail silently by default is a bad idea that is really counterintuitive for most people. (The reason for the old behavior is clearer after having read the previously mentioned blog post, although the reason for the delay in making this change is not).

I believe a lesson to be learned here is that if you are designing such a system as for instance a database, you should really think twice before choosing speed over safety as default behavior. Most users probably want safety and predictable behavior as default and increased speed as an option, not the other way around. Especially when it is as easy as changing one argument to choose between them..

 

MongoDB read preferences for replica sets

An important, but perhaps sometimes overlooked, parameter in MongoDB when using replica sets is the read preference. This parameter control how reads are handled and is an improvement over the old, (now deprecated), slave_okay-parameter.

By default, all reads are always routed to the primary. This might seem a bit counter-productive at a first glance; wouldn’t it be better that reads be distributed over all instances in the replica set? In many cases that is true but it is important to be aware of the consequences. If a secondary is falling behind the primary for some reason, a read to that secondary could give old data. Depending on your application, this might not be a problem. If you for example are reading log data for a report it might not matter much if the data might be a bit stale.

In previous versions of MongoDB you could set slave_okay to True to distribute reads over the secondaries. This parameter could be set on a connection basis or on operation basis. Starting with version 2.2 of MongoDB you should instead use the parameter read_preference. Like slave_okay it can be set when connecting to the database

import pymongo
conn = pymongo.Connection('localhost', read_preference=pymongo.SECONDARY_PREFERRED)

or just on certain operations

conn = pymongo.Connection('localhost')
conn.blogs.posts.find({'sid': 13214}, read_preference=pymongo.SECONDARY_PREFERRED)

For replica sets, read_preference can take the following values:

  • PRIMARY – This is the default setting and route all reads to the replica set primary. If the primary is unavailable for some reason a read operation would produce an error or exception.
    This is the right setting if it is important to never return stale data.
  • PRIMARY_PREFERRED – Reads are normally sent to the primary, but if it is unavailable operations read from secondary members instead.
    A use case for this might be if you are using MongoDB as backend for a web service that shows some kind of information to a customer. You want to make sure that the information that is shown is up to date but in case of a primary failover you believe it is more important to show some data, stale or not.
  • SECONDARY – Reads are only allowed on secondary members of the replica set. If no secondaries are available a read operation would give an error or exception.
    This might be useful for example if you have a heavy read load but it is important that these read operations never interfere with the write operations.
  • SECONDARY_PREFERRED – Reads are normally routed to a secondary, but if no secondary is available read operations are sent to the primary. (This is how reads are handled when slave_okay is set to True).
    A use case for this is when you are not that concerned with reading stale data and want to distribute the read operations over all set members.
  • NEAREST – Reads are performed on the nearest available set member, disregarding if it is a primary or secondary member. Nearness is determined by periodically sending pings to all members and measuring the response time.
    This could be useful when you have a very read heavy application and want to minimize network latency and do not care if the data might be stale or not.

Note that all preferences other than PRIMARY could give stale data.

Using the default PRIMARY read preference is often to limiting and could in many cases be replaced by at least PRIMARY_PREFFERED. If you for example are using MongoDB as backend for a web service, it might often be better to risk presenting stale data to the frontend then no data at all as could be the case if the primary became unavailable.

Consuming Twitter’s Streaming API using Python and cURL

Twitter’s Streaming API provides developers access to a global stream of tweet data. By setting up a persistant HTTP connection to one of the streaming endpoints you will be pushed tweets and other messages. There are some good posts and tutorials on the web about how to use the Streaming API, there are also libraries that you can use. But if you want to roll your own code there are several things you need to think about if you want to get it all right.

In this tutorial we will create a complete example of how to consume the public stream and getting all tweets that mentions the products iphone, ipad or ipod. We will connect to the stream using OAuth and deal with common errors and warnings.

Setting up the connection

Lets start with some code for setting up the connection:

import time
import pycurl
import urllib
import json
import oauth2 as oauth

API_ENDPOINT_URL = 'https://stream.twitter.com/1.1/statuses/filter.json'
USER_AGENT = 'TwitterStream 1.0' # This can be anything really

# You need to replace these with your own values
OAUTH_KEYS = {'consumer_key': <Consumer key>,
              'consumer_secret': <Consumer secret>,
              'access_token_key': <Token key>,
              'access_token_secret': <Token secret>}

# These values are posted when setting up the connection
POST_PARAMS = {'include_entities': 0,
               'stall_warning': 'true',
               'track': 'iphone,ipad,ipod'}

class TwitterStream:
    def __init__(self):
        self.oauth_token = oauth.Token(key=OAUTH_KEYS['access_token_key'], secret=OAUTH_KEYS['access_token_secret'])
        self.oauth_consumer = oauth.Consumer(key=OAUTH_KEYS['consumer_key'], secret=OAUTH_KEYS['consumer_secret'])
        self.conn = None
        self.buffer = ''
        self.setup_connection()

    def setup_connection(self):
        """ Create persistant HTTP connection to Streaming API endpoint using cURL.
        """
        if self.conn:
            self.conn.close()
            self.buffer = ''
        self.conn = pycurl.Curl()
        self.conn.setopt(pycurl.URL, API_ENDPOINT_URL)
        self.conn.setopt(pycurl.USERAGENT, USER_AGENT)
        # Using gzip is optional but saves us bandwidth.
        self.conn.setopt(pycurl.ENCODING, 'deflate, gzip')
        self.conn.setopt(pycurl.POST, 1)
        self.conn.setopt(pycurl.POSTFIELDS, urllib.urlencode(POST_PARAMS))
        self.conn.setopt(pycurl.HTTPHEADER, ['Host: stream.twitter.com',
                                             'Authorization: %s' % self.get_oauth_header()])
        # self.handle_tweet is the method that are called when new tweets arrive
        self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_tweet)

We start with defining som global parameters. Starting with version 1.1 of the Streaming API you will need to authenticate the connection using OAuth, we use Python’s  oauth2 library for this. You need to create your own OAuth-parameters and put them in OAUTH_KEYS, see here for more info about how to create these.

In POST_PARAMS we put the parameters that are posted when setting up the connection. By settings stall_warnings to 0 we will receive warnings if the connection is about to be disconnected due to the client falling behind. This can be a good idea especially if we are following high-traffic keywords.
In the parameter track we put all the keywords we want to follow in a comma-separated list.

In the method setup_connection we connect to the Streaming endpoint using cURL. We start by making sure there is no open connection already, if so we close it and empty the buffer used for saving intermediate tweet data. Then we set the necessary parameters to cURL:

  • URL – URL to the Streaming API endpoint.
  • USER_AGENT – An optional, but recommended, string for identifying your application.
  • ENCODING – By setting it to “deflate, gzip” the stream will be sent in gzipped format which saves a lot of bandwidth.
  • POST – We are going do do a POST so we set this to “1”.
  • POSTFIELDS – These are the data that we are going to post, at a minimum we need to send the keywords we want to track.
  • HTTPHEADER – Host needs to be set to ensure we get a gzipped stream and Authorization is the Oauth header.
  • WRITEFUNCTION – This method will be called with the data from stream.

Creating the Oauth header

The method get_oauth_header creates and returns the OAuth header needed for authenticating the connection.

def get_oauth_header(self):
    """ Create and return OAuth header.
    """
    params = {'oauth_version': '1.0',
              'oauth_nonce': oauth.generate_nonce(),
              'oauth_timestamp': int(time.time())}
    req = oauth.Request(method='POST', parameters=params, url='%s?%s' % (API_ENDPOINT_URL,
                                                                         urllib.urlencode(POST_PARAMS)))
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.oauth_consumer, self.oauth_token)
    return req.to_header()['Authorization'].encode('utf-8')

We create a request that we sign using our OAuth consumer key and token. Note that the url parameter need to be exactly the same as the URL and POSTFIELDS parameters that we use in setup_connection. Then we extract and return the resulting header.

Handling connection errors

By calling self.conn.perform() we enter a loop that starts receiving from the stream and sending the data to the method handle_tweet.
This loop will run until we kill the program or something happens with the connection. The connection can be dropped either by a network error or by a HTTP error. Depending on which type of error we receive, Twitter recommends different reconnection strategies. For a network error we should back off linearly, for HTTP errors we should back off exponentially, the following code calls perform and handles connection errors according to Twitters recommendations:

def start(self):
    """ Start listening to Streaming endpoint.
    Handle exceptions according to Twitter's recommendations.
    """
    backoff_network_error = 0.25
    backoff_http_error = 5
    backoff_rate_limit = 60
    while True:
        self.setup_connection()
        try:
            self.conn.perform()
        except:
            # Network error, use linear back off up to 16 seconds
            print 'Network error: %s' % self.conn.errstr()
            print 'Waiting %s seconds before trying again' % backoff_network_error
            time.sleep(backoff_network_error)
            backoff_network_error = min(backoff_network_error + 1, 16)
            continue
        # HTTP Error
        sc = self.conn.getinfo(pycurl.HTTP_CODE)
        if sc == 420:
            # Rate limit, use exponential back off starting with 1 minute and double each attempt
            print 'Rate limit, waiting %s seconds' % backoff_rate_limit
            time.sleep(backoff_rate_limit)
            backoff_rate_limit *= 2
        else:
            # HTTP error, use exponential back off up to 320 seconds
            print 'HTTP error %s, %s' % (sc, self.conn.errstr())
            print 'Waiting %s seconds' % backoff_http_error
            time.sleep(backoff_http_error)
            backoff_http_error = min(backoff_http_error * 2, 320)

(Optimally we should reset the backoff-values to the default values after a successful reconnection, that is left as an excercise…)

Processing the tweets

The Streaming API send data as a series of newline-delimited messages, where newline is considered to be “/r/n” and messages are JSON encoded data.

Apart from normal tweets we might also receive various warnings and error messages in the stream so we need to be prepared for that. You can find a complete list of these messages here. In the code bellow we will not handle all of these messages just the most important ones.

def handle_tweet(self, data):
    """ This method is called when data is received through Streaming endpoint.
    """
    self.buffer += data
    if data.endswith('\r\n') and self.buffer.strip():
        # complete message received
        message = json.loads(self.buffer)
        self.buffer = ''
        msg = ''
        if message.get('limit'):
            print 'Rate limiting caused us to miss %s tweets' % (message['limit'].get('track'))
        elif message.get('disconnect'):
            raise Exception('Got disconnect: %s' % message['disconnect'].get('reason'))
        elif message.get('warning'):
            print 'Got warning: %s' % message['warning'].get('message')
        else:
            print 'Got tweet with text: %s' % message.get('text')

Hopefully this tutorial has given you a basic understanding about how Twitter’s Streaming API works. The complete code for this example is available here.