Logo Search packages:      
Sourcecode: mascyma version File versions  Download package

maxima.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c)2003, Matthias A. Benkard.

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

from core import common
from core.common import _

import logging
import os
import re
import string
import threading
import time

log = logging.getLogger("core.maxima")

UNKNOWN = -1
NONE = 0
RESULT = 1
INPUT = 2
WELCOME = 3

NOT_RUNNING = 0
STARTING = 1
RUNNING = IDLE = 2
DEAD = 3
PROCESSING = 4


00044 class MaximaException(Exception):
      """An exception raised by a Maxima error."""
      
      def __init__(self, str = ""):
            Exception.__init__(self, str)


00051 class SyntaxException(MaximaException):
      """A syntax error."""
      
      def __init__(self, str):
            MaximaException.__init__(self, str)


00058 class MaximaError(MaximaException):
      """Maxima went MAXIMA>>."""
      
      def __init__(self, str = ""):
            MaximaException.__init__(self, str)


00065 class Maxima:
      """An interface to a Maxima session."""
      
      def __init__(self, maxima_bin, display2d = False):
            # Pre-initialization.
            self._lock = threading.Lock()
            
            # Check if Maxima is where we assume it and works.
            try:
                  spawn = os.spawnlp
            except AttributeError:
                  spawn = os.spawnl
            if spawn(os.P_WAIT, maxima_bin, maxima_bin, "--version") != 0:
                  raise Exception(_("'%s' could not be run") % maxima_bin)
            
            # Start Maxima.
            self.__stdin, self.__stdout, self.__stderr = os.popen3(maxima_bin)
            
            # Fetch Maxima's welcome message.
            m_welcome = self.receive()
            
            self.display2d = display2d
      
      
      def __setattr__(self, name, value):
            # Set Maxima variables.
            # Only do this for variables with names
            # not starting with '_'.
            try:
                  # Test whether we are looking for a function.
                  attr = self.__dict__[name]
                  isfunc = (attr.__class__ == self.destroy.__class__)
            except:
                  # What doesn't exist can't be a function, right?
                  isfunc = False
            
            if (name[0] == '_') or isfunc:
                  self.__dict__[name] = value
            else:
                  self.send_command("%s: %s" % (name, str(value)))
      
      
      def __getattr__(self, name):
            # Test whether we are looking for a function.
            try:
                  attr = self.__dict__[name]
                  isfunc = (attr.__class__ == self.destroy.__class__)
            except Exception, e:
                  # What doesn't exist can't be a function, right?
                  isfunc = False
            
            if (name[0] == '_') or isfunc:
                  return self.__dict__[name]
            else:
                  result = self.send_command(name)[1]
                  if result == 'TRUE':
                        return True
                  elif result == 'FALSE':
                        return False
                  else:
                        return result
      
      
      def destroy(self):
            self.__stdin.write("quit()")
            self.__stdin.flush()
      
      
      def send_command(self, command):
            self._lock.acquire()
            
            if not (command[-1] == '$' or command[-1] == ';'):
                  command += ';'
            
            command += "\r\n"
            
            try:
                  log.debug(_("Sending: %s") % command)
                  self.__stdin.write(command)
                  self.__stdin.flush()
                  return self.receive()
            finally:
                  self._lock.release()
      
      
00150       def receive(self):
            """Receives MAXIMA output.
            
            Returns the triple (id, type, message)."""
            
            output = ""
            rex = re.compile(r'^\(C[0-9]+\) ', re.MULTILINE)
            while not rex.search(output):
                  output += self.__stdout.read(1)
                  if output[-8:] == 'MAXIMA>>':
                        # Woah, an error occurred!
                        # We'll recover and throw an exception.
                        self.__stdin.write(":resume\r\n")
                        self.__stdin.flush()
                        raise MaximaError()
            
            match = re.compile('Incorrect syntax: (.*)').match(output)
            if match:
                  message = output[match.start():match.end()]
                  raise SyntaxException(message)
            
            # Get everything up to '(Cn)'.
            output = output[:rex.search(output).start()]
            
            if output[0] == "\n":
                  output = output[1:]
            
            # If there _is_ no (Dn), there was no input => thus, what we have at
            # our hands here is the welcome message of MAXIMA.
            # We needn't parse this.
            rex = re.compile(r'\(D[0-9]+\)', re.MULTILINE)
            match = rex.search(output)
            if not match:
                  log.debug(_("Got a welcome message from MAXIMA. Well, hello, comrade! :)"))
                  return 0, WELCOME, output
            
            # We need to isolate the id of the output.
            dn = output[match.start():match.end()]
            idmatch = re.compile(r'[0-9]+').search(dn)
            id = int(dn[idmatch.start():idmatch.end()])
            log.debug(_("Got id %i.") % id)
            
            # Additionally, kill the "(D*) " at the beginning of the string.
            # Update: If DISPLAY2D is set to TRUE, (D*) does not need to be at the
            # beginning. Oh well. This might become quite complicated now.
            # Replace "(D%i)" with spaces (3 + number of digits in %i).
            tmp = " " * len(dn)
            
            output = output.replace(dn, tmp, 1)
            
            # Kill all the whitespace at the beginning of each line of s.
            # So, first split output into lines.
            lines = output.split("\n")
            
            # Uh-oh. This is not gonna be that easy. We must do UNIX terminal like
            # tab filling. Ugh.
            # Update: I *love* the Python standard library.
            lines = map(string.expandtabs, lines)
            
            # Kill whitespace up to the first occurence of a character.
            brk = False
            while not brk:
                  # Check for a character other than a space at the beginning
                  # of any line.
                  for line in lines:
                        try:
                              if not line[0] == " ":
                                    brk = True
                                    break
                        except IndexError:
                              # Duh. Empty lines... just ignore `em.
                              continue
                  
                  if brk:
                        break
                  
                  # Only whitespace here. So let's kill the first character
                  # of each line.
                  lines = map(lambda x: x[1:], lines)
            
            # Woah, we're almost done!
            # Just reassemble the string and everyone is happy.
            output = ""
            for line in lines:
                  output += line + "\n"
            
            if output[-1] == "\n":
                  output = output[:-2]
            
            # At last! Return! Warp!
            return id, RESULT, output


class MaximaThread(threading.Thread):
      _threadnum = 0
      
      def __init__(self, window = None):
            threading.Thread.__init__(self)
            self.setName("MaximaThread%i" % MaximaThread._threadnum)
            self.setDaemon(True)
            
            self.log = logging.getLogger("core.maxima." + self.getName())
            MaximaThread._threadnum += 1
            self.status = _("Thread not started")
            self.maxima_state = NOT_RUNNING
            self.error = None
            self.window = window
            
            self.should_quit = False
      
      
      def run(self):
            # Initialize Maxima.
            self.log.info(_("Running Maxima."))
            self.status = _("Starting Maxima.")
            self.maxima_state = STARTING
            self.window.notify(self, self.status, self.error)
            try:
                  self.maxima = Maxima(common.pref["maxima_exe"])
            except MaximaException, e:
                  self.log.debug(_("A MaximaException occurred."))
                  self.error = e
                  self.status = _("A MaximaException occurred.")
                  self.window.notify(self, self.status, self.error)
            except Exception, e:
                  self.log.debug(_("A non-Maxima exception occurred."))
                  self.error = e
                  self.status = _("A non-Maxima exception occurred.")
                  self.window.notify(self, self.status, self.error)
            else:
                  self.log.debug(_("Maxima started."))
                  self.error = None
                  self.status = _("Maxima started.")
                  self.maxima_state = IDLE
                  self.window.notify(self, self.status, self.error)
            
            state = self.maxima_state
            while not self.should_quit:
                  if state != self.maxima_state:
                        # Maxima's state has changed.
                        state = self.maxima_state
                        self.window.notify(self, "")
                  
                  time.sleep(0.2)
            
            self.log.info(_("Shutting down Maxima."))
            try:
                  self.maxima.send_command("quit()")
            except MaximaException, e:
                  self.error = e
                  self.status = _("A MaximaException occurred.")
                  self.window.notify(self, self.status, self.error)
            else:
                  self.error = None
                  self.status = _("Maxima was shut down cleanly.")
                  self.maxima_state = DEAD
                  self.window.notify(self, self.status, self.error)
      
      
      def set_state(self, state):
            self.maxima_state = state
      
      
      def maxima_send_command(self, command):
            self.set_state(PROCESSING)
            error = None
            try:
                  id, type, result = self.maxima.send_command(command)
            except MaximaException, e:
                  self.error = error = e
                  self.status = _("A MaximaException occurred.")
                  self.window.notify(self, self.status, self.error)
            self.set_state(IDLE)
            return id, type, result, error
      
      def maxima_quit(self):
            self.should_quit = True


Generated by  Doxygen 1.6.0   Back to index