Geo-tagged tweets collection using Twitter Streaming API and database
One research I’m working on is to use Twitter data to predict crime patterns. So, the first thing I need to do is to collect Twitter data. Specifically, since I’m interested in discovering the spatial patterns of crime, only geo-tagged tweets are collected. Based on the purpose of your own project, you might need to use Twitter official REST API if you want to search for specific sets of tweets, or use Twitter official Streaming API if you want to collect tweets in real time. The Streaming API is quite different from the REST API in that the REST API is used to pull data from Twitter but the streaming api pushes messages to a persistent session. In this blog post I’m going to discuss how to collect Twitter messages using Twitter Streaming API. In the next post, I’m going to talk about the use of Twitter REST API to collect tweets.
The python package I’m using is Tweepy. The collected Twitter messages will be then stored into a database, either MySQL or MongoDB. The full codes are hosted on my Github repository.
Tweepy
Tweepy
is a Python package which enables users to more easily work with the official Twitter API. It’s sort of like a Python wrapper that bridges the communication between your own program and the Twitter API. Let’s go straight to the code snippets.
The first thing we need to do is to register the client application with Twitter. Log in to Twitter Apps with your Twitter account and create a new application. Once you are done you should have your consumer key, consumer secret, access token and access token secret. Now, we import the packages and define the keys and access tokens.
import os,sys,re | |
from dateutil import parser | |
import json | |
import tweepy | |
import MySQLdb | |
from pymongo import MongoClient | |
CONSUMER_KEY = "_my_consumer_key_" | |
CONSUMER_SERECT = "_my_consumer_serect_" | |
ACCESS_TOKEN = "_my_access_token_" | |
ACCESS_SERECT = "_my_access_secret_" |
Next, we create a MyStreamListener
class. This class will later be used to create a tweepy.Stream
object and connect to the Twitter Streaming API. We define the on_connect()
, on_data()
and on_error()
methods. The parent tweepy.StreamListener
class has alreary defined these methods. We overwrite the default ones to add our own intended logic.
class MyStreamListener(tweepy.StreamListener): | |
def on_connect(self): | |
print '......Connected to Twitter Streaming API...... \n' | |
def on_data(self, raw_data): | |
try: | |
data = json.loads(raw_data) #decode the json object from twitter | |
if data['coordinates'] or data['geo']: #collect geo-tagged tweet | |
created_at = parser.parse(data['created_at'],ignoretz=True) #tweet posted at UTC time | |
text = data['text'] | |
id_str = data['id_str'] | |
user_id = data['user']['id_str'] | |
user_name = data['user']['screen_name'] | |
lat = str(data['coordinates']['coordinates'][1]) | |
lon = str(data['coordinates']['coordinates'][0]) | |
lang = data['user']['lang'] | |
print '@%s' % user_name | |
print 'Tweeted at %s UTC' % created_at | |
print text | |
print 'Lat: %s' % lat | |
print 'Lon: %s \n' % lon | |
except Exception as e: | |
print e | |
def on_error(self, status_code): | |
if status_code == 420: #returning False in on_data disconnects the stream | |
return False | |
else: #continue listening if other errors occur | |
print ('An Error has occurred: ' + repr(status_code) | |
return True |
The method on_connect()
will be invoked once a successful response is received from the server. When the connection is established and raw data is received, the method on_data()
will be called. If condition ensures that only tweets associated with coordinates information are received. The received tweet object is in JSON format. So we use json.loads()
method to first decode JSON object to Python object. The collected tweet object has a long list of attributes. We are only interested in some of the attributes and we print them onto the terminal screen.
The method on_error()
will be called when a non-200 status code is returned. HTTP Status codes are issued by a server in response to a browser’s request made to the server. A successful HTTP request will return a status code 200. A special case in using Twitter API is the issue of rate limit. Twitter limits the number of requests a user can make during a specific time window. The Twitter API will send a 420 status code if we’re being rate limited.
Database
Normally, we don’t just want to print out the collected tweets on the terminal screen. We also want to store them for later analysis. Of course, you can choose to store all the collected tweets into a single file. But a more efficient and appropriate choice is to store them into a database.
MySQL
Let’s first look at how to store collected tweets into MySQL
. We need a SQL connector to connect to a MySQL
database in Python. I use MySQLdb
package, but you are free to use the alternative. The first thing we need to do is to install MySQL
. Check this post I wrote before about how to install and set up MySQL on Mac. Then we need to install MySQLdb
package.
Before we import MySQLdb
in our Python program, we should create a database and a table first. Add a database called twitter
:
mysql> CREATE DATABASE twitter;
A twitter
database is created. Use the database with USE
command:
mysql> USE twitter;
Then, create a table called twitter_stream_collect
which we are going to use to store the data.
CREATE TABLE `twitter_stream_collect` ( | |
`created_at` timestamp NULL DEFAULT NULL, | |
`text` varchar(560) default NULL, | |
`id_str` varchar(30) NOT NULL, | |
`user_id` varchar(30) NOT NULL, | |
`user_name` varchar(80) NOT NULL, | |
`lat` varchar(20) default NULL, | |
`lon` varchar(20) default NULL, | |
`lang` char(2) default NULL, | |
PRIMARY KEY (`id_str`) | |
) ENGINE=InnoDB DEFAULT CHARSET=utf8; |
Now, we define a method to create a connection to the twitter
database, execute and commit the query.
HOST = "_name_of_host" # Use "localhost" to store data into local compuetr | |
USER = "_name_of_user" # Use "root" if you connect to mysql as superuser root | |
PASSWD = "_user_password_" # Use your root password | |
DATABASE = "_dababase_to_connect" # In our example it's "twitter" | |
def mysql_store(created_at, text, id_str, user_id, user_name, lat, lon, lang): | |
db=MySQLdb.connect(host=HOST, user=USER, passwd=PASSWD, db=DATABASE, charset="utf8") | |
cursor = db.cursor() | |
insert_query = "INSERT INTO twitter_stream_collect (created_at, text, id_str, user_id, user_name, lat, lon, lang) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)" | |
cursor.execute(insert_query, (created_at, text, id_str, user_id, user_name, lat, lon, lang)) | |
db.commit() | |
cursor.close() | |
db.close() | |
return |
MongoDB
Another option for the use of database is MongoDB. Unlike MySQL, MongoDB is a NoSQL database. It stores data in flexible, JSON-like documents. You don’t have to define a schema before the use of a database.
We define a method to create a database, connect to it, and store data into it:
MONGO_HOST = "_name_of_host_" # Use 'mongodb://localhost/twitter_collect_db' assuming you store data into a local database twitter_collect_db | |
def mongodb_store(created_at, text, id_str, user_id, user_name, lat, lon, lang): | |
client = MongoClient(MONGO_HOST) | |
db = client.twitter_collect_db | |
data_mongo = {} | |
data_mongo['id_str'] = id_str | |
data_mongo['user_name'] = user_name | |
data_mongo['user_id'] = user_id | |
data_mongo['tweeted_at'] = tweeted_at | |
data_mongo['text'] = text | |
data_mongo['lat'] = lat | |
data_mongo['lon'] = lon | |
data_mongo['lang'] = lang | |
db.tweet_collect.insert_one(data_mongo) |
Both of the mysql_store()
and mongodb_store()
methods are invoked inside on_data()
. Check my Git repository for full codes.
Run collecting
The final step is to authenticate with our keys and access tokens, instantiate the MyStreamListener
class, connect to the Twitter streaming API, and filter the collected tweets will locations filtering criteria.
if __name__ == '__main__': | |
LOCATIONS = [WEST, SOUTH, EAST, NORTH] # the coordinates of the bounding area | |
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SERECT) | |
auth.set_access_token(ACCESS_TOKEN, ACCESS_SERECT) | |
api = tweepy.API(wait_on_rate_limit_notify=True) | |
listener = MyStreamListener(api=api) | |
streamer = tweepy.Stream(auth=auth, listener=listener) | |
logger.info('......Collecting geo-tagged tweets......') | |
streamer.filter(locations=LOCATIONS) |
Now, you can collect Twitter screaming data in real time and store them into database.