09.21.09
Posted in Uncategorized at 3:59 pm by alewo
Da ich im Rahmen meiner Diplomarbeit ja einen Prototyp entwickeln musste, der in Python ein Bluetooth-Scatternet-Discovery Mechanismus umsetzt, war es auch nötig einen Mechanismus zum Zugriff auf die einzelnen Features des Prototyps für externe Anwendungen umzusetzen. Ich habe hierfür eine DBUS-Kommunikation implementiert. Und damit das gewonnene Wissen nicht in meinem Gehirn zu einer Lücke gerinnt, will ich es hier der Allgemeinheit gern zur Verfügung stellen.
| Client 1 (Service-Provider) |
# -*- coding: ascii -*-
import dbus, dbus.service, dbus.glib
import logging
logging.basicConfig(level=logging.DEBUG)
""" Name of the dbus interfaces"""
DBUS_IFCE = "aDBUSifce"
DBUS_NAME = "aDBUSname"
DBUS_PATH = "/"
class MyDbusInterface(dbus.service.Object):
def __init__(self ):
self.bus = dbus.SessionBus()
self.busname = dbus.service.BusName(DBUS_NAME, bus=self.bus)
dbus.service.Object.__init__(self, dbus.SessionBus(), DBUS_PATH)
# ------------------ dbus methods -----------------------------
@dbus.service.method(dbus_interface=SCM_DBUS_IFCE, in_signature="s", out_signature="")
def offered_Service(self, parameter):
#execute the service request and do the things you should do
pass
@dbus.service.method(dbus_interface=SCM_DBUS_IFCE, in_signature="ss", out_signature="")
def offered_Service_too(self, parameter, secondParameter):
#execute the service request and do the things you should do
pass
# ------------------ dbus signal emitters -----------------------------
@dbus.service.signal(dbus_interface=SCM_DBUS_IFCE, signature="ss")
def onCallbackSignal(self, first, second):
"""do something useful like log etc."""
pass
@dbus.service.signal(dbus_interface=SCM_DBUS_IFCE, signature="sss")
def onAnotherCallbackSignal(self, first, second, third):
"""Broadcast signal to notify changes"""
pass
|
| Client 2 (Service-Consumer) |
#!/usr/bin/python2.5
#The client program that connects to dbus and gets messages
import dbus, dbus.glib
import gobject
""" Name of the dbus interfaces"""
DBUS_IFCE = "aDBUSifce"
DBUS_NAME = "aDBUSname"
class TestClient:
def __init__(self):
bus = dbus.SessionBus()
proxyObj = bus.get_object(DBUS_NAME, '/')
self.dbIfce = dbus.Interface(proxyObj, SCM_DBUS_IFCE)
#connect to the Remote-Signals for callbacks
self.dbIfce.connect_to_signal("onCallbackSignal",
self.executeMethodOnSignal,
SCM_DBUS_IFCE)
self.dbIfce.connect_to_signal("onAnotherCallbackSignal",
self.executeThisForAnotherSignal,
SCM_DBUS_IFCE)
self.loop = gobject.MainLoop()
#--- Methods for Callbacks ---
def executeMethodOnSignal(self, firstParameter, secondParameter):
print " do something like print: firstParameter=%s,
secondParameter=%s"
% (firstParameter, secondParameter)
def executeThisForAnotherSignal(self, firstParameter, secondParameter, thirdParameter):
print " do something like print: firstParameter=%s,
secondParameter=%s,
thirdParameter=%s"
% (firstParameter, secondParameter,
thirdParameter)
# --- actions ---
# Methods for call remote Interfaces (service provider)
def a_offered_Service(self):
self.dbIfce.offered_Service("I'm a String Parameter")
def another_offered_Service(self):
result = self.dbIfce.offered_Service_too( "I'm a String Parameter, too",
"and one other again")
print result
pass
#--only for using the testclient
def run(self):
gobject.timeout_add(1000, self.a_offered_Service)
gobject.timeout_add(2000, self.another_offered_Service)
gobject.timeout_add(8000, self.loop.quit)
self.loop.run()
if __name__ == '__main__':
tc = TestClient()
tc.run()
|
Erklärungen folgen irgendwann via Update... ansonsten Code lesen lernen =)
Permalink