# technorati.py v0.01 # Copyright (C) 2003 Phillip Pearson # Release notes: http://www.myelin.co.nz/post/2003/5/12/#200305124 # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation files # (the "Software"), to deal in the Software without restriction, # including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, # and to permit persons to whom the Software is furnished to do so, # subject to the following conditions: # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import urllib, sgmllib from pprint import pprint _key = open('apikey.txt').readline().strip() class opener(urllib.FancyURLopener): version = 'Technorati/Python v0.01; http://www.myelin.co.nz/technorati/' callcache = {} try: callcache = eval(open('cache.txt').read()) except: pass class BadUrlError(Exception): pass def call(proc, args): if args['url'] in (None, ''): raise BadUrlError("No URL supplied") args['key'] = _key url = 'http://api.technorati.com/%s.xml?%s' % (proc, urllib.urlencode(args)) print "calling",url if not callcache.has_key(url): print "(fetching)" o = opener() f = o.open(url) callcache[url] = f.read() return callcache[url] def parse(parser, xml): parser.feed(xml) parser.close() return parser.data class genericParser(sgmllib.SGMLParser): def __init__(self, itemsName): sgmllib.SGMLParser.__init__(self) self.data = {} self.inresult = self.inweblog = self.initem = 0 self.weblog = None self.item = None self.data[itemsName] = self.items = [] self.collector = None def collect(self): assert self.collector is None, "already collecting: parse failure!" self.collector = [] def grab(self): s = "".join(self.collector) self.collector = None return s def handle_data(self, s): if self.collector is not None: self.collector.append(s) def start_document(self, attrs): pass def end_document(self): pass def start_result(self, attrs): self.inresult = 1 def end_result(self): self.inresult = 0 def start_item(self, attrs): self.initem = 1 self.item = {} def end_item(self): self.initem = 0 self.items.append(self.item) self.item = None def start_nearestpermalink(self, attrs): assert self.initem self.collect() def end_nearestpermalink(self): self.item['nearestpermalink'] = self.grab() def start_excerpt(self, attrs): assert self.initem self.collect() def end_excerpt(self): self.item['excerpt'] = self.grab() def start_linkcreated(self, attrs): assert self.initem self.collect() def end_linkcreated(self): self.item['linkcreated'] = self.grab() def start_weblog(self, attrs): assert self.initem or self.inresult, "found element outside or " self.inweblog = 1 self.weblog = {} def end_weblog(self): self.inweblog = 0 if self.initem: self.item['weblog'] = self.weblog #self.weblogs.append(self.weblog) elif self.inresult: self.data['weblog'] = self.weblog else: raise AssertionFailure, " element not in item or result...?" self.weblog = None def start_url(self, attrs): self.collect() def end_url(self): if self.inweblog: self.weblog['url'] = self.grab() else: self.data['url'] = self.grab() def start_name(self, attrs): self.collect() def end_name(self): self.weblog['name'] = self.grab() def start_rssurl(self, attrs): self.collect() def end_rssurl(self): self.weblog['rssurl'] = self.grab() def start_inboundblogs(self, attrs): self.collect() def end_inboundblogs(self): if self.inweblog: x = self.weblog elif self.inresult: x = self.data else: raise AssertionFailure, " element not in or " x['inboundblogs'] = int(self.grab()) def start_inboundlinks(self, attrs): self.collect() def end_inboundlinks(self): if self.inweblog: x = self.weblog elif self.inresult: x = self.data else: raise AssertionFailure, " element not in or " x['inboundlinks'] = int(self.grab()) def start_lastupdate(self, attrs): self.collect() def end_lastupdate(self): self.weblog['lastupdate'] = self.grab() def getCosmos(url): "gets a blog's cosmos and returns an ApiResponse containing a Weblog object ('weblog') for the blog and a list ('inLinks') of Link objects for its neighbours" xml = call('cosmos', {'url': url}) #print xml data = parse(genericParser('inbound'), xml) return data def getBlogInfo(url): "gets info about a blog and returns it as a Weblog object" xml = call('bloginfo', {'url': url}) #print xml data = parse(genericParser('weblogs'), xml) return data.get('weblog', None) def getOutboundBlogs(url): "gets a list of blogs linked to by a blog and returns an ApiResponse containing a Weblog object ('weblog') for the blog and a list ('outLinks') of Weblog objects for the linked-to blogs" xml = call('outboundblogs', {'url': url}) #print xml data = parse(genericParser('outbound'), xml) return data def test(url): pprint(getCosmos(url)) pprint(getBlogInfo(url)) pprint(getOutboundBlogs(url)) def main(): import sys, getopt opts, rest = getopt.getopt(sys.argv[1:], 'tu:', ('test', 'inbound', 'info', 'outbound', 'url=')) url = 'http://topicexchange.com/' func = None for opt,val in opts: #print (opt,val) _map = {'inbound': getCosmos, 'info': getBlogInfo, 'outbound': getOutboundBlogs, } if opt in ('-u', '--url'): url = val elif opt in ('-t', '--test'): func = test elif opt.startswith('--') and _map.has_key(opt[2:]): assert func is None, "Only one function (url, inbound, info or outbound) may be supplied" func = _map[opt[2:]] if func is None: print "No function supplied; --url, --inbound, --info or --outbound must be specified on the command line" return pprint(func(url)) if __name__ == '__main__': main() open('cache.txt', 'wt').write(`callcache`)