# !/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import os
from os.path import expanduser
import sqlite3
import time
import sys
from threading import Thread
import libtorrent as lt
from bitween.components.pubsub import Subscriber
from bitween.components.models import contact_shares
from bitween.components.models import handles
from . import logger
from bitween.components.models import config
from .helpers import utf8_encoded
[docs]class BitTorrentClient(Thread, Subscriber):
"""
BitTorrent Client Class
"""
def __init__(self):
"""
initialize a new BitTorrent Client to run in a Thread
start with .start()
"""
Thread.__init__(self)
Subscriber.__init__(self, autosubscribe=True)
self.statdb = os.path.join(expanduser("~"), '.bitween.db')
self.setup_db()
self.session = lt.session()
self.session.set_alert_mask(lt.alert.category_t.all_categories)
logger.info('libtorrent %s' % lt.version)
self.handles = handles
"""-----alert categories-----
error_notification
peer_notification
port_mapping_notification
storage_notification
tracker_notification
debug_notification
status_notification
progress_notification
ip_block_notification
performance_warning
stats_notification
dht_notification
rss_notification
all_categories
"""
self.end = False
self.setup_settings()
conf = config.conf
logger.debug('conf: %s' % conf)
if conf.get('bt', False):
if conf['bt'].get('ports', False):
ports = conf['bt'].get('ports')
try:
self.session.listen_on(ports[0], ports[1])
except Exception as e:
logger.error('could not set ports for BT: %s' % e)
if conf['bt'].get("enable_upnp", False):
self.session.start_upnp()
else:
logger.info('running without upnp')
self.session.stop_upnp()
if conf['bt'].get("enable_natpmp", False):
self.session.start_natpmp()
else:
logger.info('running without natpmp')
self.session.stop_natpmp()
self.session.stop_dht()
# self.session.stop_lsd()
self.resume() # load torrents from db
self.name = 'bt'
self.publish('bt_ready')
[docs] def setup_settings(self):
"""
called by the init method. sets up some sane settings.
:return:
"""
# settings
# pesettings = lt.pe_settings()
# pesettings.in_enc_policy = lt.enc_policy.forced
# pesettings.out_enc_policy = lt.enc_policy.forced
# pesettings.allowed_enc_level = lt.enc_level.rc4
# self.session.set_pe_settings(pesettings)
session_settings = lt.session_settings()
self.session.set_settings(session_settings)
# extensions
self.session.add_extension(lt.create_metadata_plugin) # Allows peers to download the metadata (.torren files) from the swarm directly. Makes it possible to join a swarm with just a tracker and info-hash.
self.session.add_extension(lt.create_ut_metadata_plugin) # same, utorrent compatible
# self.session.add_extension(lt.create_ut_pex_plugin) # Exchanges peers between clients.
# self.session.add_extension(
# lt.create_smart_ban_plugin) # A plugin that, with a small overhead, can ban peers that sends bad data with very high accuracy. Should eliminate most problems on poisoned torrents.
[docs] def setup_db(self):
"""
called by the init method.
sets up the sqlite database if there is none.
:return:
"""
dbfile = self.statdb
if not os.path.exists(dbfile):
conn = sqlite3.connect(dbfile)
c = conn.cursor()
c.execute(
"CREATE TABLE torrents (magnetlink VARCHAR PRIMARY KEY, torrent BLOB, status BLOB, save_path VARCHAR)")
conn.commit()
conn.close()
def __del__(self):
logger.info("torrentsession exits!")
[docs] def on_exit(self):
"""
trigger a safe shutdown of the thread
:return:
"""
self.safe_shutdown()
[docs] def safe_shutdown(self):
"""
sets the end variable which tells the run method to jump out
of the handling loop and trigger the save_handle for each handle.
:return:
"""
logger.info('triggering safe shutdown')
self.end = True
[docs] def handle_queue(self):
"""
handle the IPC Message queue
:return:
"""
if self.has_messages():
topic, args, kwargs = self.get_message()
try:
f = getattr(self, 'on_%s' % topic)
f(*args, **kwargs)
except Exception as e:
logger.error('something went wrong when calling on_%s: %s' % (topic, e))
[docs] def handle_alert(self, alert):
"""
Handle the libtorrent Alerts
:param alert:
:return:
"""
# http://www.libtorrent.org/reference-Alerts.html
if alert.what() == "torrent_update_alert":
self.publish('publish_shares')
elif alert.what() == "state_update_alert":
self.publish('publish_shares')
elif alert.what() == "file_error_alert":
logger.error("FILE ERROR: %s" % alert.error)
self.session.remove_torrent(alert.handle)
self.handles.remove(alert.handle)
self.publish('publish_shares')
elif (alert.what() == "stats_alert"):
pass
elif (alert.what() == "listen_failed_alert"):
logger.error('failed to listen on interface %s: %s' % (alert.listen_interface(), alert.message()))
# elif alert.what() == "external_ip_alert": # todo
# ip = alert.external_address
# logger.info('got our ip: %s' % ip)
# self.publish('set_ip_address', ip) # todo
elif alert.what() == "portmap_alert":
# http://www.rasterbar.com/products/libtorrent/manual.html#portmap-alert
# This alert is generated when a NAT router was successfully found and a port was successfully mapped on it. On a NAT:ed network with a NAT-PMP capable router, this is typically generated once when mapping the TCP port and, if DHT is enabled, when the UDP port is mapped.
self.publish('set_port', alert.external_port)
elif alert.what() == "metadata_received_alert":
# send shares when we have enough data to tell someone about it
self.publish('publish_shares')
elif alert.what() == "portmap_error_alert":
logger.error('portmap error: %s' % alert.error)
elif (alert.what() == "save_resume_data_alert"):
handle = alert.handle
self.save(handle, alert.resume_data)
logger.debug("removing %s at %s" % (handle.name(), handle))
self.session.remove_torrent(handle)
# print(self.session.wait_for_alert(1000))
self.handles.remove(handle)
elif (alert.what() == "save_resume_data_failed_alert"):
handle = alert.handle
logger.error("failed removing %s" % handle.name())
self.session.remove_torrent(handle)
self.handles.remove(handle)
else:
logging.debug('alert: %s - %s' % (alert.what(), alert.message()))
[docs] def run(self):
"""
the run method of the thread.
this will process all the handles and messages in the input_queue.
when safe_shutdown is called, it will trigger save_resume_data for every handle and wait for the alert
with the resume_data.
when every handle is deleted (after saving each resume_data), the session will be saved;
were done and the thread will exit.
"""
logger.debug("BT running ob port %s" % self.session.listen_port())
while not self.end:
self.handle_queue()
for alert in self.session.pop_alerts():
self.handle_alert(alert)
time.sleep(1)
logger.info("ending")
# ending - save stuff
# erase previous torrents first
self.erase_all_torrents_from_db()
# then trigger saving resume data
for handle in self.handles:
logger.debug('asking for resume data for %s' % handle.get_torrent_info().name())
handle.save_resume_data(lt.save_resume_flags_t.flush_disk_cache)
# set alert mast to get the right alerts
#self.session.set_alert_mask(lt.alert.category_t.storage_notification)
# wait for everything to save and finish!
while self.handles:
# logger.debug(self.handles.list)
for alert in self.session.pop_alerts():
logger.debug("- %s %s" % (alert.what(), alert.message()))
self.handle_alert(alert)
time.sleep(1)
logger.info("handles at return: %s" % self.handles)
return
[docs] def on_recheck_handles(self):
"""
recheck all handles, in case we could have new endpoints
should be triggered by the xmpp client if we got new shares
:return:
"""
for handle in self.handles:
addr_tuples = contact_shares.hashes.get(str(handle.info_hash()), [])
for addr_tuple in addr_tuples:
try:
if addr_tuple not in [peer_info.ip for peer_info in handle.get_peer_info()]:
logger.info('connecting to %s:%s' % (addr_tuple[0], addr_tuple[1]))
handle.connect_peer((addr_tuple[0], int(addr_tuple[1])), 0)
except Exception as e:
logger.error('recheck_handles: cant connect to %s:%s: %s' %
(addr_tuple[0], addr_tuple[1], e))
[docs] def on_add_hash(self, sha_hash, save_path):
"""
add a hash to the torrent session
used to download new stuff.
after creation of the torrent all addresses that we have collected for the hash will be added
:param sha_hash: hash of the torrent
:type sha_hash: str
:param save_path: path to save
:type save_path: str
:return:
"""
mlink = 'magnet:?xt=urn:btih:%s' % sha_hash
params = {
'save_path': save_path,
'duplicate_is_error': True
}
logger.debug('adding new handle by magnetlink')
handle = lt.add_magnet_uri(self.session, utf8_encoded(mlink), params)
self.handles.append(handle)
logger.debug('adding peers to handle...')
addr_tuples = contact_shares.hashes.get(sha_hash, [])
if not addr_tuples:
logger.error('no addresses for %s' % sha_hash)
return None
for addr_tuple in addr_tuples:
logger.debug('adding peer to %s: %s:%s'
% (sha_hash, addr_tuple[0], addr_tuple[1]))
try:
handle.connect_peer((addr_tuple[0], int(addr_tuple[1])), 0)
except Exception as e:
logger.error('cant connect to %s:%s: %s' %
(addr_tuple[0], addr_tuple[1], e))
def _add_torrent_by_info(self, torrentinfo, save_path, resumedata=None):
"""
needed to resume torrents by the saved resume_data
also used by on_generate_torrent
:param torrentinfo: the torrentinfo
:param save_path: path to save
:param resumedata: resume_data
:return:
"""
if not resumedata:
logger.debug("no resume data!")
handle = self.session.add_torrent(
{
'ti': torrentinfo,
'save_path': save_path
})
else:
logger.debug('resuming')
handle = self.session.add_torrent(
{
'ti': torrentinfo,
'save_path': save_path,
'resume_data': resumedata
})
self.handles.append(handle)
self.publish('publish_shares')
[docs] def on_generate_torrent(self, path):
"""
generates a handle for a file or folder
:param path: path to generate a handle for
:type path: str
:return:
"""
logging.info('generating a new torrent for %s in %s' % (
os.path.abspath(path), os.path.abspath(os.path.join(os.path.abspath(path), os.pardir))))
fs = lt.file_storage()
lt.add_files(fs, os.path.abspath(path))
t = lt.create_torrent(fs)
t.set_creator('bitween') # %s' % lt.version)
lt.set_piece_hashes(t,
os.path.abspath(os.path.join(
path,
os.pardir))) # file and the folder it is in
torrent = t.generate()
logger.debug('generated')
info = lt.torrent_info(torrent)
self._add_torrent_by_info(info, save_path=os.path.abspath(os.path.join(
path,
os.pardir)))
[docs] def on_del_torrent(self, hash):
"""
delete a torrent from the current session.
:param handle:
:return: True, or False if not found
"""
for handle in self.handles:
if str(handle.info_hash()) == utf8_encoded(hash):
self.session.remove_torrent(handle)
self.handles.remove(handle)
logger.info('removing handle for %s' % hash)
self.publish('publish_shares')
return True
# else:
# print("%s != %s" % (handle.info_hash(), utf8_encoded(hash)))
# print("%s != %s" % (type(handle.info_hash()), type(utf8_encoded(hash))))
return False
[docs] def save(self, handle, resume_data):
"""
called when the "save_resume_data_alert" is recieved while shutdown.
saves handle with resume_data to database
:param handle:
:param resume_data:
:return:
"""
logger.debug('saving handle %s' % handle.name())
save_path = handle.save_path()
torrent = lt.create_torrent(handle.get_torrent_info())
torfile = lt.bencode(torrent.generate())
magnet = lt.make_magnet_uri(handle.get_torrent_info())
status = lt.bencode(resume_data)
db = sqlite3.connect(self.statdb)
# create table torrents (magnetlink varchar(256), torrent blob, status blob);
c = db.cursor()
c.execute("INSERT OR REPLACE INTO torrents VALUES (?, ?, ?, ?)",
(magnet, sqlite3.Binary(torfile), sqlite3.Binary(status), save_path))
db.commit()
db.close()
[docs] def resume(self):
"""
reads the sessionsettings from db file and sets them,
reads handles from db file and adds them to the session.
:return:
"""
# load state
db = sqlite3.connect(self.statdb)
c = db.cursor()
# load last torrents
erg = c.execute("SELECT * FROM torrents")
for t in erg.fetchall():
logger.info("importing %s" % t[0])
entry = lt.bdecode(bytes(t[1]))
fastresumedata = bytes(t[2])
save_path = t[3]
torrentinfo = lt.torrent_info(entry)
self._add_torrent_by_info(torrentinfo, save_path=save_path, resumedata=fastresumedata)
db.close()
pass
[docs] def erase_all_torrents_from_db(self):
"""
removes all torrents from the session.
called before the current torrents are going to be saved
:return:
"""
db = sqlite3.connect(self.statdb)
c = db.cursor()
c.execute("DELETE FROM torrents")
db.commit()
db.close()