Geo-tagged tweets collection using Twitter Streaming API and database

One research I’m working on is to use Twitter data to predict crime patterns. So, the first thing I need to do is to collect Twitter data. Specifically, since I’m interested in discovering the spatial patterns of crime, only geo-tagged tweets are collected. Based on the purpose of your own project, you might need to use Twitter official REST API if you want to search for specific sets of tweets, or use Twitter official Streaming API if you want to collect tweets in real time. The Streaming API is quite different from the REST API in that the REST API is used to pull data from Twitter but the streaming api pushes messages to a persistent session. In this blog post I’m going to discuss how to collect Twitter messages using Twitter Streaming API. In the next post, I’m going to talk about the use of Twitter REST API to collect tweets.

The python package I’m using is Tweepy. The collected Twitter messages will be then stored into a database, either MySQL or MongoDB. The full codes are hosted on my Github repository.

Tweepy

Tweepy is a Python package which enables users to more easily work with the official Twitter API. It’s sort of like a Python wrapper that bridges the communication between your own program and the Twitter API. Let’s go straight to the code snippets.

The first thing we need to do is to register the client application with Twitter. Log in to Twitter Apps with your Twitter account and create a new application. Once you are done you should have your consumer key, consumer secret, access token and access token secret. Now, we import the packages and define the keys and access tokens.

import os,sys,re
from dateutil import parser
import json
import tweepy
import MySQLdb
from pymongo import MongoClient
CONSUMER_KEY = "_my_consumer_key_"
CONSUMER_SERECT = "_my_consumer_serect_"
ACCESS_TOKEN = "_my_access_token_"
ACCESS_SERECT = "_my_access_secret_"

Next, we create a MyStreamListener class. This class will later be used to create a tweepy.Stream object and connect to the Twitter Streaming API. We define the on_connect(), on_data() and on_error() methods. The parent tweepy.StreamListener class has alreary defined these methods. We overwrite the default ones to add our own intended logic.

class MyStreamListener(tweepy.StreamListener):
def on_connect(self):
print '......Connected to Twitter Streaming API...... \n'
def on_data(self, raw_data):
try:
data = json.loads(raw_data) #decode the json object from twitter
if data['coordinates'] or data['geo']: #collect geo-tagged tweet
created_at = parser.parse(data['created_at'],ignoretz=True) #tweet posted at UTC time
text = data['text']
id_str = data['id_str']
user_id = data['user']['id_str']
user_name = data['user']['screen_name']
lat = str(data['coordinates']['coordinates'][1])
lon = str(data['coordinates']['coordinates'][0])
lang = data['user']['lang']
print '@%s' % user_name
print 'Tweeted at %s UTC' % created_at
print text
print 'Lat: %s' % lat
print 'Lon: %s \n' % lon
except Exception as e:
print e
def on_error(self, status_code):
if status_code == 420: #returning False in on_data disconnects the stream
return False
else: #continue listening if other errors occur
print ('An Error has occurred: ' + repr(status_code)
return True

The method on_connect() will be invoked once a successful response is received from the server. When the connection is established and raw data is received, the method on_data() will be called. If condition ensures that only tweets associated with coordinates information are received. The received tweet object is in JSON format. So we use json.loads() method to first decode JSON object to Python object. The collected tweet object has a long list of attributes. We are only interested in some of the attributes and we print them onto the terminal screen.

The method on_error() will be called when a non-200 status code is returned. HTTP Status codes are issued by a server in response to a browser’s request made to the server. A successful HTTP request will return a status code 200. A special case in using Twitter API is the issue of rate limit. Twitter limits the number of requests a user can make during a specific time window. The Twitter API will send a 420 status code if we’re being rate limited.

Database

Normally, we don’t just want to print out the collected tweets on the terminal screen. We also want to store them for later analysis. Of course, you can choose to store all the collected tweets into a single file. But a more efficient and appropriate choice is to store them into a database.

MySQL

Let’s first look at how to store collected tweets into MySQL. We need a SQL connector to connect to a MySQL database in Python. I use MySQLdb package, but you are free to use the alternative. The first thing we need to do is to install MySQL. Check this post I wrote before about how to install and set up MySQL on Mac. Then we need to install MySQLdb package.

Before we import MySQLdb in our Python program, we should create a database and a table first. Add a database called twitter:

mysql> CREATE DATABASE twitter;

A twitter database is created. Use the database with USE command:

mysql> USE twitter;

Then, create a table called twitter_stream_collect which we are going to use to store the data.

CREATE TABLE `twitter_stream_collect` (
`created_at` timestamp NULL DEFAULT NULL,
`text` varchar(560) default NULL,
`id_str` varchar(30) NOT NULL,
`user_id` varchar(30) NOT NULL,
`user_name` varchar(80) NOT NULL,
`lat` varchar(20) default NULL,
`lon` varchar(20) default NULL,
`lang` char(2) default NULL,
PRIMARY KEY (`id_str`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Now, we define a method to create a connection to the twitter database, execute and commit the query.

HOST = "_name_of_host" # Use "localhost" to store data into local compuetr
USER = "_name_of_user" # Use "root" if you connect to mysql as superuser root
PASSWD = "_user_password_" # Use your root password
DATABASE = "_dababase_to_connect" # In our example it's "twitter"
def mysql_store(created_at, text, id_str, user_id, user_name, lat, lon, lang):
db=MySQLdb.connect(host=HOST, user=USER, passwd=PASSWD, db=DATABASE, charset="utf8")
cursor = db.cursor()
insert_query = "INSERT INTO twitter_stream_collect (created_at, text, id_str, user_id, user_name, lat, lon, lang) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
cursor.execute(insert_query, (created_at, text, id_str, user_id, user_name, lat, lon, lang))
db.commit()
cursor.close()
db.close()
return

MongoDB

Another option for the use of database is MongoDB. Unlike MySQL, MongoDB is a NoSQL database. It stores data in flexible, JSON-like documents. You don’t have to define a schema before the use of a database.

We define a method to create a database, connect to it, and store data into it:

MONGO_HOST = "_name_of_host_" # Use 'mongodb://localhost/twitter_collect_db' assuming you store data into a local database twitter_collect_db
def mongodb_store(created_at, text, id_str, user_id, user_name, lat, lon, lang):
client = MongoClient(MONGO_HOST)
db = client.twitter_collect_db
data_mongo = {}
data_mongo['id_str'] = id_str
data_mongo['user_name'] = user_name
data_mongo['user_id'] = user_id
data_mongo['tweeted_at'] = tweeted_at
data_mongo['text'] = text
data_mongo['lat'] = lat
data_mongo['lon'] = lon
data_mongo['lang'] = lang
db.tweet_collect.insert_one(data_mongo)

Both of the mysql_store() and mongodb_store() methods are invoked inside on_data(). Check my Git repository for full codes.

Run collecting

The final step is to authenticate with our keys and access tokens, instantiate the MyStreamListener class, connect to the Twitter streaming API, and filter the collected tweets will locations filtering criteria.

if __name__ == '__main__':
LOCATIONS = [WEST, SOUTH, EAST, NORTH] # the coordinates of the bounding area
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SERECT)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SERECT)
api = tweepy.API(wait_on_rate_limit_notify=True)
listener = MyStreamListener(api=api)
streamer = tweepy.Stream(auth=auth, listener=listener)
logger.info('......Collecting geo-tagged tweets......')
streamer.filter(locations=LOCATIONS)

Now, you can collect Twitter screaming data in real time and store them into database.

#database
#data mining
#nlp
#python
More topics
Written by Shuzhan Fan on Mar 20, 2018