09.21.09

Python – DBUS Communication

Posted in Uncategorized at 3:59 pm by alewo

Da ich im Rahmen meiner Diplomarbeit ja einen Prototyp entwickeln musste, der in Python ein Bluetooth-Scatternet-Discovery Mechanismus umsetzt, war es auch nötig einen Mechanismus zum Zugriff auf die einzelnen Features des Prototyps für externe Anwendungen umzusetzen. Ich habe hierfür eine DBUS-Kommunikation implementiert. Und damit das gewonnene Wissen nicht in meinem Gehirn zu einer Lücke gerinnt, will ich es hier der Allgemeinheit gern zur Verfügung stellen.

Client 1 (Service-Provider)

# -*- coding: ascii -*-

import dbus, dbus.service, dbus.glib
import logging
logging.basicConfig(level=logging.DEBUG)

""" Name of the dbus interfaces"""
DBUS_IFCE = "aDBUSifce"
DBUS_NAME = "aDBUSname"
DBUS_PATH = "/"

class MyDbusInterface(dbus.service.Object):
    def __init__(self ):

        self.bus = dbus.SessionBus()
        self.busname = dbus.service.BusName(DBUS_NAME, bus=self.bus)
        dbus.service.Object.__init__(self, dbus.SessionBus(), DBUS_PATH)

    # ------------------ dbus methods -----------------------------       

    @dbus.service.method(dbus_interface=SCM_DBUS_IFCE, in_signature="s", out_signature="")
    def offered_Service(self, parameter):
        #execute the service request and do the things you should do
        pass

    @dbus.service.method(dbus_interface=SCM_DBUS_IFCE, in_signature="ss", out_signature="")
    def offered_Service_too(self, parameter, secondParameter):
        #execute the service request and do the things you should do
        pass

    # ------------------ dbus signal emitters -----------------------------

    @dbus.service.signal(dbus_interface=SCM_DBUS_IFCE, signature="ss")
    def onCallbackSignal(self, first, second):
        """do something useful like log etc."""
        pass

    @dbus.service.signal(dbus_interface=SCM_DBUS_IFCE, signature="sss")
    def onAnotherCallbackSignal(self, first, second, third):
        """Broadcast signal to notify changes"""
        pass
Client 2 (Service-Consumer)

#!/usr/bin/python2.5

#The client program that connects to dbus and gets messages
import dbus, dbus.glib
import gobject

""" Name of the dbus interfaces"""
DBUS_IFCE = "aDBUSifce"
DBUS_NAME = "aDBUSname"

class TestClient:

	def __init__(self):

		bus = dbus.SessionBus()
		proxyObj = bus.get_object(DBUS_NAME, '/')
		self.dbIfce = dbus.Interface(proxyObj, SCM_DBUS_IFCE)

		#connect to the Remote-Signals for callbacks
		self.dbIfce.connect_to_signal("onCallbackSignal",
                                                         self.executeMethodOnSignal,
                                                         SCM_DBUS_IFCE)
		self.dbIfce.connect_to_signal("onAnotherCallbackSignal",
                                                         self.executeThisForAnotherSignal,
                                                         SCM_DBUS_IFCE)

		self.loop = gobject.MainLoop()

	#--- Methods for Callbacks ---
	def executeMethodOnSignal(self, firstParameter, secondParameter):
		print "  do something like print:  firstParameter=%s,
                                                           secondParameter=%s"
                                                           % (firstParameter, secondParameter)	

	def executeThisForAnotherSignal(self, firstParameter, secondParameter, thirdParameter):
		print "  do something like print:  firstParameter=%s,
                                                           secondParameter=%s,
                                                           thirdParameter=%s"
                                                           % (firstParameter, secondParameter,
                                                           thirdParameter)	

	# --- actions ---
	# Methods for call remote Interfaces (service provider)
	def a_offered_Service(self):
		self.dbIfce.offered_Service("I'm a String Parameter") 

	def another_offered_Service(self):
		result = self.dbIfce.offered_Service_too( "I'm a String Parameter, too",
                                                                       "and one other again")
		print result
		pass

	#--only for using the testclient
	def run(self):
		gobject.timeout_add(1000, self.a_offered_Service)
		gobject.timeout_add(2000, self.another_offered_Service)
		gobject.timeout_add(8000, self.loop.quit)
		self.loop.run()

if __name__ == '__main__':
    tc = TestClient()
    tc.run()

Erklärungen folgen irgendwann via Update... ansonsten Code lesen lernen =)

Leave a Comment

You must be logged in to post a comment.