2005/5/9

     
 

AORB.cpp

artefaktur
// -*- mode:C++; tab-width:2; c-basic-offset:2; indent-tabs-mode:nil -*- 
//
// Copyright (C) 2000-2005 by Roger Rene Kommer / artefaktur, Kassel, Germany.
// 
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public License (LGPL).
// 
// 
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the 
// License ACDK-FreeLicense document enclosed in the distribution
// for more for more details.
// This file is part of the Artefaktur Component Development Kit:
//                         ACDK
// 
// Please refer to
// - http://www.acdk.de
// - http://www.artefaktur.com
// - http://acdk.sourceforge.net
// for more information.
// 
// $Header: /cvsroot/acdk/acdk/acdkx_orb/src/acdkx/orb/AORB.cpp,v 1.36 2005/04/30 20:18:41 kommer Exp $
#include <acdk.h>
#include <acdk/lang/CmdLineParser.h>
#include <acdk/io/Reader.h>
#include <acdk/io/Writer.h>
#include <acdk/net/InetAddress.h>
#include <acdk/net/Socket.h>
#include <acdk/net/TCPSocket.h>
#include <acdk/net/SocketException.h>
#include <acdk/net/URL.h>

#include <acdk/lang/Thread.h>
#include <acdk/lang/Short.h>

#include "AORB.h"
#include <org/omg/CORBA/GIOP/GIOP.h>
#include <org/omg/CORBA/IIOP/IIOP.h>

#include "CDRObjectReader.h"
#include "GIOPMessage.h"
#include "AServerRequestImpl.h"
#include "ServerDelegate.h"
#include "CorObject.h"

namespace acdkx {
namespace orb {

#if defined(LOCAL_DEBUG)
# define DOUT(msg) sys::coreout << msg << sys::eofl
#else
/FONT>
# define DOUT(msg) do { } while(false)
#endif
/FONT>

USING_CLASS(::org::omg::CORBA::, ORB);
USING_CLASS(::acdk::net::, Socket);
USING_CLASS(::acdk::net::, ServerSocket);
USING_CLASS(::acdk::io::, Reader);
USING_CLASS(::acdk::io::, Writer);

static SkelInfoClassesStruct* _proxyRoot = 0;
static SkelInfoClassesStruct* _proxyTail = 0;

RegisterSkelInfoClass::RegisterSkelInfoClass(SkelInfoClassesStruct* proxy)
: _proxy(proxy)
{
  if (_proxyRoot == 0) {
    _proxyTail = _proxyRoot = proxy;
  } else {
    _proxyTail->next = proxy;
    _proxyTail = proxy;
  }
}

RegisterSkelInfoClass::~RegisterSkelInfoClass()
{
  //### to implement
}



RServerDelegate 
AORB::createProxy(IN(RObjectKey) key)
{
  if (key->isNil() == true)
    return Nil;
  RString name = ObjectKey::classNameFromRepId(key->type_id);
  
  const char* clname = name->c_str();
  if (strchr(name->c_str(), '/') != 0)
    clname = strrchr(name->c_str(), '/') + 1;

  SkelInfoClassesStruct* proxy = _proxyRoot;
  if (strcmp(clname, "AcdkObject") == 0)
  {
    RServerDelegate sd = new CorObject(key, getORB());
    sd->isClient(true);
    return sd;
  }
  while (proxy != 0) {
    
    if (strcmp(clname, proxy->clazz->name) == 0) 
    {
      if (proxy->clazz->ns[0] != 0) {
        StringBuffer sb(proxy->clazz->ns);
        sb.append("/"); 
        sb.append(proxy->clazz->name);
        if (sb.toString()->equals(name) == false) {
          proxy = proxy->next;
          continue;
        }
      }
      
      RServerDelegate sd = (RServerDelegate)proxy->creator();
      sd->isClient(true);
      sd->setObjectKey(key);
      return sd;
    }
    proxy = proxy->next;
  }
  RCorObject cobj = new CorObject(key, getORB());
  
  try {
    if (name->length() > 0)
      cobj->setRemoteClass(Class::forName(name));
  } catch (...) {

  }
  return &cobj;
}


//static 
const acdk::lang::dmi::ClazzMethodInfo*
AORB::lookupMethod(IN(RString) method, const dmi::ClazzInfo* clazz)
{
  if (method->startsWith("__acdk_dmi_") == true)
  {
    // ### todo
  }
  const dmi::ClazzInfo* ci = clazz;
  int i = 0;
  while (ci->methods[i]) 
  {
    if (strcmp(ci->methods[i]->name, method->c_str()) == 0)
      return ci->methods[i];
    i++;
  }
  const dmi::ClazzMethodInfo* merg = 0;
  for (i = 0; ci->interfaces[i]; i++) 
  {
    ::acdk::lang::sys::core_vector<const acdk::lang::dmi::ClazzMethodInfo*> vec;
    findFunctions(ci->interfaces[i]->type, method, acdk::lang::dmi::MiPublic, vec);
    if (vec.size() < 1)
      return 0;
    return vec[1];
  }
  return 0;
}

//static
RAORB AORB::_theORB;


//static
int 
AORB::_lookupFreePort()
{
  //return 1234;
  return 0;
  /** not used
  int i = StdOrbPort + 1;
  return i;
  ::acdk::net::RInetAddress address = ::acdk::net::InetAddress::getLocalHost();
  while (true) {
    //::acdk::net::TCPSocket socket;
    //socket.create(false);
    try {
      //::acdk::net::ServerSocket socket(i);
      ::acdk::net::Socket sock(address, i);
      sock.close();
      //socket.connect(address, i);
      //socket.close();
      i++;
      continue;
    } catch(::acdk::net::RSocketException ) {
      break;
    }
  }
  System::out->println("Port: " + Integer::toString(i));
  return i;
  */
}

AORB::AORB()
: Thread(),
  _port(-1),
  _objects(new HashMap()),
  _objectsIds(new HashMap()),
  _shutdown(false),
  _treadGroup(new ThreadGroup("ORB")),
  _isServer(false),
  _serverSocket(Nil),
  ignoreLocal(false),
  ignoreDmiOverGIOP(false)
{
  _port = _lookupFreePort();
}



//static 
AORB& 
AORB::getAORB()
{
  if (_theORB != Nil)
    return *_theORB;
  _theORB = new AORB();
  System::registerStaticReference(_theORB);
  return *_theORB;
}

void 
AORB::init(IN(RStringArray) args, IN(acdk::util::RProperties) props)
{
  _properties = props;
  if (props == Nil)
    _properties = System::getProperties();

  ::acdk::io::File pfile(System::getProperty("ACDKHOME"), "acdkx_orb.cfg");
  if (pfile.exists() == true)
  {
    _properties->load(pfile.getReader());
  }
  CmdLineParser cparser;
  cparser.addOption("-ORBIIOPAddr", "acdkx.orb.ORBIIOPAddr", true, "Set the ORB server listner address");
  cparser.addOption("-ORBNamingAddr", "acdkx.orb.ORBNamingAddr", true, "Points to the address of the naming service");
  cparser.addOption("-ORBNamingIOR ", "acdkx.orb.ORBNamingIOR", true, "Points to the address of the naming service");
  cparser.parse(_properties, args, true, true);

  RString addr = _properties->getProperty("acdkx.orb.ORBIIOPAddr");
  if (addr == Nil || addr->length() == 0)
    addr = "inet:localhost:" + Integer::toString(_lookupFreePort());
  acdk::net::URL url(addr);
  _serverHost = url.getHost();
  _port = url.getPort();

  
}

int 
AORB::port() 
{ 
  return _port; 
}

//virtual 
RStringArray 
AORB::list_initial_services()
{

  return Nil;
}
  
//virtual 
::org::omg::CORBA::RObject 
AORB::resolve_initial_references(IN(RString) identifier) THROWS1(RInvalidName)
{
  if (identifier->equals("NameService") == true)
  {
    RString val = _properties->getProperty("acdkx.orb.ORBNamingAddr");
    if (val != Nil && val->length() > 0)
    {
      return string_to_object(val);
    }
  }
  THROW0_FQ(org::omg::CORBA::, InvalidName);
  return Nil;
}
  
//virtual 
void 
AORB::connect(IN(::org::omg::CORBA::RObject) obj)
{
  /** not needed?

  ObjectKey objkey("IIOP", acdk::net::InetAddress::getLocalHost()->toString(), _port, obj);
  RString objstring = objkey.toString();
  _objects->put(obj, objstring);
  _objectsIds->put(objstring, obj);
  */
}


//virtual 
RString 
AORB::object_to_string(IN(::org::omg::CORBA::RObject) obj)
{
  _initServer();
  return ServerDelegate::object_to_string(obj);
}


//virtual 
::org::omg::CORBA::RObject 
AORB::string_to_object(IN(RString) str)
{
  if (str->startsWith("corbaloc:") == true)
    return corbaloc_to_object(str);
  return &ServerDelegate::string_to_object(str);
}


::org::omg::CORBA::RObject 
AORB::corbaloc_to_object(IN(RString) str)
{
  if (str->startsWith("corbaloc:") == false)
    THROW2_FQ(org::omg::CORBA::, BAD_PARAM, 9, org::omg::CORBA::COMPLETED_NO);
  RString url = "";
  RString base = str->substr(strlen("corbaloc:"));
  int idx = base->indexOf('/');
  RString addr = base;
  if (idx != -1)
  {
    url = base->substr(idx + 1);
    addr = base->substr(0, idx);
    if (addr->charAt(0) == ':')
      addr = addr->substr(1);
    else if (addr->startsWith("iiop:") == true)
      addr = addr->substr(5);
  }
  RString version = "1.0";
  RString port = "2089";
  RString host = "localhost";
  // [ <version> `@' ] <host> [ `:' <port> ]
  if ((idx = addr->indexOf('@')) != -1)
  {
    version = addr->substr(0, idx);
    addr = addr->substr(idx + 1);
  }
  if ((idx = addr->indexOf(':')) != -1)
  {
    host = addr->substr(0, idx);
    port = addr->substr(idx + 1);
  }
  else
  {
    host = addr;
  }
  if ((idx = version->indexOf('.')) == -1)
    THROW2_FQ(org::omg::CORBA::, BAD_PARAM, 9, org::omg::CORBA::COMPLETED_NO);
  short majorversion = Short::parseShort(version->substr(0, idx));
  short minorversion = Short::parseShort(version->substr(idx + 1));
  RObjectKey objkey = new ObjectKey();
  objkey->type_id = url;
  objkey->object_key = url->getBytes();
  objkey->version.major = majorversion;
  objkey->version.minor = minorversion;
  objkey->protokoll = "inet";
  objkey->network = host;
  objkey->port = Short::parseShort(port);
  return &createProxy(objkey);
}


RString 
AORB::impl_is_ready(IN(::acdk::lang::RObject) obj)
{
  _initServer();
  RServerDelegate sd = RServerDelegate(obj);
  RObjectKey objkey = new ObjectKey(sd.iptr());
  _objects->put(obj, (acdk::lang::Object)objkey);
  _objects->put((acdk::lang::Object)objkey->objectId(), (acdk::lang::Object)objkey);
  return "";
}
  
void
AORB::_initServer()
{
  if (_serverSocket != Nil)
    return;
  if (_serverHost == Nil)
    _serverHost = AORB::getLocalHost();
  _serverSocket = new ServerSocket(_port, 30, acdk::net::InetAddress::getByName(_serverHost));
  _port = _serverSocket->getLocalPort();
  DOUT("AORB::_initServer: " << _serverSocket->toString()->c_str() << "; port=" << _port);
}

//virtual 
bool 
AORB::work_pending()
{
  
  return true;
}

//virtual 
void 
AORB::perform_work()
{
  
}


ACDK_DECL_CLASS(ClientConnection);
class ClientConnection 
: public ::acdk::lang::Thread
{
  RAORB _orb;
  RSocket _clientSocket;
public:
  ClientConnection(IN(RAORB) orb, IN(RSocket) clsock)
  : Thread(/*orb->threadGroup()*/),
    _orb(orb),
    _clientSocket(clsock)
  {
  }
  virtual void run();
};

//virtual 
void 
ClientConnection::run()
{
  //System::out->println("ClientConnection::run()");
  
  RReader in = _clientSocket->getInputStream();
  RWriter out = _clientSocket->getOutputStream();

  //RCDRObjectReader cdrin = new CDRObjectReader(in, (::org::omg::CORBA::RORB)_orb);
  try {
    do {
      RCDRObjectReader cdrin;
      RGIOPMessage mess = GIOPMessage::readMessage((::org::omg::CORBA::RORB)_orb, in, cdrin);
      RAServerRequestImpl serverrequest = new AServerRequestImpl((::org::omg::CORBA::RORB)_orb, 
                                                      (RGIOPRequestMessage)mess, cdrin, out);
      mess->handleCall(serverrequest);
    } while (true);
  } catch (RThrowable ex) {
    System::out->println(ex->getMessage());
  }
  //_clientSocket->close(); don't close it here, let client do
}


//virtual 
void 
AORB::run()
{
  if (_serverSocket == Nil)
    THROW1(Exception, "_serverSocket is not initialized! No acdk::lang::Object registered?");
  //RServerSocket theSocket = new ServerSocket(_port);
  _isServer = true;
  //_serverSocket->setSoTimeout(1000);
  while (_doShutdown() == false) 
  {
    RSocket clsock = _serverSocket->accept();
    
    if (_doShutdown() == true)
      break;

    if (clsock)
    {
      RClientConnection client_thread = new ClientConnection(this, clsock);
      client_thread->start();
    }
  }
  _serverSocket->close();
  _isServer = false;
}

//virtual 
void 
AORB::shutdown(bool wait_for_completion)
{
  _shutdown = true;
  if (_isServer == true)
  {
    ::acdk::net::Socket tsock(true);
    tsock.connect(_serverSocket->getInetAddress()/*AORB::getLocalHost()*/, _serverSocket->getLocalPort(), 200);// to unblock accept  in AORB::run();
    
  }
  
  //_serverSocket->close();
  if (wait_for_completion == true)  // ### what to do
    ; // ??
}

//virtual 
void 
AORB::destroy()
{
}

//static 
void 
AORB::reset()
{
  if (_theORB == Nil)
    return;
  int rc = _theORB->refCount();
  if (rc != 1)
    System::out->println("**  WARN: AORB::reset(): Warning ORB is used somewhere");
  _theORB = Nil;
}

acdk::lang::Object 
AORB::object_key_to_object(sequence<octet>& object_key)
{
  RString str = new String((const char*)object_key.data(), object_key.size(), NormalSST | CCAscii);
  ObjectKey objkey(str);
  if (objkey.isLocal() == true)
    return objkey.getLocalObject();
  // return Skelleton here
  return Nil;
}

acdk::lang::Object 
AORB::resolve_object(AServerRequestImpl& req)
{
  if (req.inMessage()->header().version.minor < 2) {
    return object_key_to_object(req.inMessage()->requestHeader().object_key);
  } else {
    switch (req.inMessage()->requestHeader().target_address.addressingDisposition) {
    case org::omg::CORBA::GIOP::TargetAddress::KeyAddr:
      return object_key_to_object(req.inMessage()->requestHeader().target_address.object_key);
    case org::omg::CORBA::GIOP::TargetAddress::ProfileAddr :
      //profile.read(in);
      break;
    case org::omg::CORBA::GIOP::TargetAddress::ReferenceAddr:
      //ior.read(in);
      break;
    }
  }
  return Nil;
}

bool 
AORB::isOwnObjectId(IN(RObjectKey) key)
{
  if (key->port == _port &&
      key->network->equals(getLocalHost()))
      return true;
  /*
  if (_objectsIds->get(key->objectId()) != Nil)
    return true;
  */
  return false;
}

//static 
RString 
AORB::getLocalHost()
{
  return acdk::net::InetAddress::getLocalHost()->toString();
}

//static 
int 
AORB::getLocalPort()
{
  return getAORB().port();
}


} // namespace orb
} // namespace acdkx